Skip to content

Commit

Permalink
Merge pull request #36 from felix5572/master
Browse files Browse the repository at this point in the history
update docs with dargs
  • Loading branch information
y1xiaoc committed Jun 4, 2021
2 parents c4ed51f + 1fda035 commit 7fadb30
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 36 deletions.
78 changes: 78 additions & 0 deletions doc/machine-auto.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
machine:
| type: ``dict``
| argument path: ``machine``
batch_type:
| type: ``str``
| argument path: ``machine/batch_type``
The batch job system type. Option: Slurm, PBS, LSF, Shell, DpCloudServer

context_type:
| type: ``str``
| argument path: ``machine/context_type``
The connection used to remote machine. Option: LocalContext, LazyLocalContext, SSHContext, DpCloudServerContext

local_root:
| type: ``str``
| argument path: ``machine/local_root``
The dir where the tasks and relating files locate. Typically the project dir.

remote_root:
| type: ``str``, optional
| argument path: ``machine/remote_root``
The dir where the tasks are executed on the remote machine.

remote_profile:
| type: ``dict``, optional
| argument path: ``machine/remote_profile``
The information used to maintain the connection with remote machine. see subclass introduction
SSHContext.remote_profile:
| type: ``dict``
| argument path: ``SSHContext.remote_profile``
hostname:
| type: ``str``
| argument path: ``SSHContext.remote_profile/hostname``
hostname or ip of ssh connection.

username:
| type: ``str``
| argument path: ``SSHContext.remote_profile/username``
username of target linux system

password:
| type: ``str``, optional
| argument path: ``SSHContext.remote_profile/password``
password of linux system

port:
| type: ``int``, optional, default: ``22``
| argument path: ``SSHContext.remote_profile/port``
ssh connection port.

key_filename:
| type: ``NoneType`` | ``str``, optional, default: ``None``
| argument path: ``SSHContext.remote_profile/key_filename``
key_filename used by ssh connection

passphrase:
| type: ``NoneType`` | ``str``, optional, default: ``None``
| argument path: ``SSHContext.remote_profile/passphrase``
passphrase used by ssh connection

timeout:
| type: ``int``, optional, default: ``10``
| argument path: ``SSHContext.remote_profile/timeout``
timeout of ssh connection
63 changes: 63 additions & 0 deletions doc/resources-auto.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
resources_dict:
| type: ``dict``
| argument path: ``resources_dict``
number_node:
| type: ``int``
| argument path: ``resources_dict/number_node``
The number of node need for each `job`

cpu_per_node:
| type: ``int``
| argument path: ``resources_dict/cpu_per_node``
cpu numbers of each node.

gpu_per_node:
| type: ``int``
| argument path: ``resources_dict/gpu_per_node``
gpu numbers of each node.

queue_name:
| type: ``str``
| argument path: ``resources_dict/queue_name``
The queue name of batch job scheduler system.

group_size:
| type: ``int``
| argument path: ``resources_dict/group_size``
The number of `tasks` in a `job`.

custom_flags:
| type: ``str``, optional
| argument path: ``resources_dict/custom_flags``
The extra lines pass to job submitting script header

strategy:
| type: ``dict``, optional
| argument path: ``resources_dict/strategy``
strategies we use to generation job submitting scripts.

para_deg:
| type: ``int``, optional
| argument path: ``resources_dict/para_deg``
Decide how many tasks will be run in parallel.

source_list:
| type: ``list``, optional
| argument path: ``resources_dict/source_list``
The env file to be sourced before the command execution.

kwargs:
| type: ``dict``, optional
| argument path: ``resources_dict/kwargs``
extra key-value pair
22 changes: 19 additions & 3 deletions dpdispatcher/dpcloudserver/zip_file.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
import os, glob
from posixpath import realpath
from zipfile import ZipFile
import shutil

# def zip_file_list(root_path, zip_filename, file_list=[]):
# shutil.make_archive(base_name=zip_filename,
# root_dir=root_path,)

def zip_file_list(root_path, zip_filename, file_list=[]):
out_zip_file = os.path.join(root_path, zip_filename)
# print('debug: file_list', file_list)
zip_obj = ZipFile(out_zip_file, 'w')
for f in file_list:
matched_files = os.path.join(root_path, f)
for ii in glob.glob(matched_files):
realpath = os.path.realpath(ii)
arcname = os.path.relpath(ii, start=root_path)
zip_obj.write(realpath, arcname)
# print('debug: matched_files:ii', ii)
if os.path.isdir(ii):
arcname = os.path.relpath(ii, start=root_path)
zip_obj.write(ii, arcname)
for root, dirs, files in os.walk(ii):
for file in files:
filename = os.path.join(root, file)
arcname = os.path.relpath(filename, start=root_path)
# print('debug: filename:arcname:root_path', filename, arcname, root_path)
zip_obj.write(filename, arcname)
else:
arcname = os.path.relpath(ii, start=root_path)
zip_obj.write(ii, arcname)
zip_obj.close()
return out_zip_file

Expand Down
55 changes: 46 additions & 9 deletions dpdispatcher/machine.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

from dpdispatcher.ssh_context import SSHSession
import os,sys,time,random,uuid,json
from typing import Optional
import dargs
Expand Down Expand Up @@ -51,6 +52,19 @@ class Machine(object):
"""

subclasses_dict = {}

# def __new__(cls, batch_type, context_type, local_root='./', remote_root=None, remote_profile={}):
# if cls is Machine:
# # instance = object.__new__(batch_type, context_type, local_root, remote_root, remote_profile)
# subcls = cls.subclasses_dict[cls.__name__]
# instance = subcls.__new__(batch_type, context_type, local_root, remote_root, remote_profile)
# else:
# instance = object.__new__(cls)
# return instance

# def __init__(self, batch_type, context_type, local_root='./', remote_root=None, remote_profile={}):
# pass

def __init__ (self,
context):
self.context = context
Expand All @@ -73,19 +87,41 @@ def load_from_json(cls, json_path):
return machine

@classmethod
def load_from_dict(cls, machine_dict):
def get_arginfo(cls):
doc_batch_type = 'The batch job system type. Option: Slurm, PBS, LSF, Shell, DpCloudServer'
doc_context_type = 'The connection used to remote machine. Option: LocalContext, LazyLocalContext, SSHContext, DpCloudServerContext'
doc_local_root = 'The dir where the tasks and relating files locate. Typically the project dir.'
doc_remote_root = 'The dir where the tasks are executed on the remote machine.'
doc_remote_profile = 'The information used to maintain the connection with remote machine. see subclass introduction'

machine_args = [
Argument("batch_type", str, optional=False),
Argument("context_type", str, optional=False),
Argument("local_root", str, optional=False),
Argument("remote_root", str, optional=True),
Argument("remote_profile", dict, optional=True),
Argument("batch_type", str, optional=False, doc=doc_batch_type),
Argument("context_type", str, optional=False, doc=doc_context_type),
Argument("local_root", str, optional=False, doc=doc_local_root, default='./'),
Argument("remote_root", str, optional=True, doc=doc_remote_root),
Argument("remote_profile", dict, optional=True, doc=doc_remote_profile, default={}),
]
machine_format = Argument("machine_dict", dict, machine_args)
machine_format.check_value(machine_dict)

machine_format = Argument("machine", dict, machine_args)
return machine_format

@classmethod
def dargs_check(cls, machine_dict={}):
machine_format = cls.get_arginfo()
check_return = machine_format.check_value(machine_dict)
return check_return

@classmethod
def dargs_gen_doc(cls):
machine_format = cls.get_arginfo()
ptr = machine_format.gen_doc()
ssh_remote_profile_format = BaseContext.subclasses_dict['SSHContext'].get_remote_profile_arginfo()
ptr += ssh_remote_profile_format.gen_doc()
return ptr

@classmethod
def load_from_dict(cls, machine_dict):
batch_type = machine_dict['batch_type']
# print("debug777:batch_class", cls.subclasses_dict, batch_type)
try:
machine_class = cls.subclasses_dict[batch_type]
except KeyError as e:
Expand Down Expand Up @@ -222,3 +258,4 @@ def gen_command_env_cuda_devices(self, resources):
# for ii in list_CUDA_VISIBLE_DEVICES:
# command_env+="{ii},".format(ii=ii)
return command_env

25 changes: 24 additions & 1 deletion dpdispatcher/ssh_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os, sys, paramiko, json, uuid, tarfile, time, stat, shutil
from glob import glob
from dpdispatcher import dlog
from dargs.dargs import Argument
# from dpdispatcher.submission import Machine

class SSHSession (object):
Expand Down Expand Up @@ -195,7 +196,29 @@ def load_from_dict(cls, context_dict):
# clean_asynchronously=jdata.get('clean_asynchronously', False),
# )
return ssh_context


@staticmethod
def get_remote_profile_arginfo():
doc_hostname = 'hostname or ip of ssh connection.'
doc_username = 'username of target linux system'
doc_password = 'password of linux system'
doc_port = 'ssh connection port.'
doc_key_filename = 'key_filename used by ssh connection'
doc_passphrase = 'passphrase used by ssh connection'
doc_timeout = 'timeout of ssh connection'

ssh_remote_profile_args = [
Argument("hostname", str, optional=False, doc=doc_hostname),
Argument("username", str, optional=False, doc=doc_username),
Argument("password", str, optional=True, doc=doc_password),
Argument("port", int, optional=True, default=22, doc=doc_port),
Argument("key_filename", [str, None], optional=True, default=None, doc=doc_key_filename),
Argument("passphrase", [str, None], optional=True, default=None, doc=doc_passphrase),
Argument("timeout", int, optional=True, default=10, doc=doc_timeout)
]
ssh_remote_profile_format = Argument("SSHContext.remote_profile", dict, ssh_remote_profile_args)
return ssh_remote_profile_format

@property
def ssh(self):
return self.ssh_session.get_ssh_client()
Expand Down
63 changes: 49 additions & 14 deletions dpdispatcher/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from dargs.dargs import Argument
from dpdispatcher.JobStatus import JobStatus
from dpdispatcher import dlog
from dpdispatcher import dlog, machine
from hashlib import sha1
# from dpdispatcher.slurm import SlurmResources
#%%
Expand Down Expand Up @@ -674,21 +674,56 @@ def load_from_json(cls, json_file):

@classmethod
def load_from_dict(cls, resources_dict):

return cls(**resources_dict)

@classmethod
def get_arginfo(cls):
doc_number_node = 'The number of node need for each `job`'
doc_cpu_per_node = 'cpu numbers of each node assigned to each job. Not the maximum number available in each node'
doc_gpu_per_node = 'gpu numbers of each node assigned to each job. Not the maximum number available in each node'
doc_queue_name = 'The queue name of batch job scheduler system.'
doc_group_size = 'The number of `tasks` in a `job`.'
doc_custom_flags = 'The extra lines pass to job submitting script header'
doc_para_deg = 'Decide how many tasks will be run in parallel.'
doc_source_list = 'The env file to be sourced before the command execution.'
doc_kwargs = 'extra key-value pair'

strategy_args = [
Argument("if_cuda_multi_devices", bool, optional=True, default=True)
]
doc_strategy = 'strategies we use to generation job submitting scripts.'
strategy_format = Argument("strategy", dict, strategy_args, optional=True, default=default_strategy, doc=doc_strategy)

resources_args = [
Argument("number_node", int, optional=False),
Argument("cpu_per_node", int, optional=False),
Argument("gpu_per_node", int, optional=False),
Argument("queue_name", str, optional=False),
Argument("group_size", int, optional=False),

Argument("custom_flags", str, optional=True),
Argument("strategy", dict, optional=True),
Argument("para_deg", int, optional=True),
Argument("source_list", list, optional=True),
Argument("kwargs", dict, optional=True)
Argument("number_node", int, optional=False, doc=doc_number_node),
Argument("cpu_per_node", int, optional=False, doc=doc_cpu_per_node),
Argument("gpu_per_node", int, optional=False, doc=doc_gpu_per_node),
Argument("queue_name", str, optional=False, doc=doc_queue_name),
Argument("group_size", int, optional=False, doc=doc_group_size),

Argument("custom_flags", str, optional=True, doc=doc_custom_flags),
# Argument("strategy", dict, optional=True, doc=doc_strategy,default=default_strategy),
strategy_format,
Argument("para_deg", int, optional=True, doc=doc_para_deg, default=1),
Argument("source_list", list, optional=True, doc=doc_source_list, default=[]),
]
resources_format = Argument("resources_dict", dict, resources_args)
resources_format.check_value(resources_dict)
return cls(**resources_dict)
return resources_format

@classmethod
def dargs_check(cls, resources_dict={}):
resources_format = cls.get_arginfo()
check_return = resources_format.check_value(resources_dict)
return check_return

@classmethod
def dargs_gen_doc(cls):
resources_format = cls.get_arginfo()
ptr = resources_format.gen_doc()
return ptr

# %%

# Resources.dargs_check(if_gen_docs=True)
# %%
19 changes: 19 additions & 0 deletions scripts/script_gen_dargs_docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

#%%
# import sys, os
# sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..' )))
# import dpdispatcher
from dpdispatcher.submission import Resources
from dpdispatcher.machine import Machine


# %%
resources_dargs_doc = Resources.dargs_gen_doc()
with open('../doc/resources-auto.rst', 'w') as f:
# print(resources_dargs_doc)
f.write(resources_dargs_doc)

machine_dargs_doc = Machine.dargs_gen_doc()
with open('../doc/machine-auto.rst', 'w') as f:
f.write(machine_dargs_doc)
# %%

0 comments on commit 7fadb30

Please sign in to comment.