diff --git a/daliuge-engine/dlg/deploy/create_dlg_job.py b/daliuge-engine/dlg/deploy/create_dlg_job.py index 25bd56f52..838956714 100644 --- a/daliuge-engine/dlg/deploy/create_dlg_job.py +++ b/daliuge-engine/dlg/deploy/create_dlg_job.py @@ -28,234 +28,51 @@ import datetime import optparse -import os import pwd import re import socket -import string -import subprocess import sys import time -import json +import os -from dlg import utils -from dlg.deploy.configs import * # get all available configurations -from dlg.runtime import __git_version__ as git_commit +from dlg.deploy.configs import ConfigFactory # get all available configurations +from deployment_constants import DEFAULT_AWS_MON_PORT, DEFAULT_AWS_MON_HOST +from slurm_client import SlurmClient -default_aws_mon_host = "sdp-dfms.ddns.net" # TODO: need to change this -default_aws_mon_port = 8898 +FACILITIES = ConfigFactory.available() -facilities = ConfigFactory.available() -class SlurmClient(object): +def get_timestamp(line): """ - parameters we can control: - - 1. user group / account name (Required) - 2. whether to submit a graph, and if so provide graph path - 3. # of nodes (of Drop Managers) - 4. how long to run - 5. whether to produce offline graph vis - 6. whether to attach proxy for remote monitoring, and if so provide - DLG_MON_HOST - DLG_MON_PORT - 7. Root directory of the Log files (Required) + microsecond precision """ + split = line.split() + date_time = "{0}T{1}".format(split[0], split[1]) + pattern = "%Y-%m-%dT%H:%M:%S,%f" + epoch = time.mktime(time.strptime(date_time, pattern)) + return datetime.datetime.strptime(date_time, pattern).microsecond / 1e6 + epoch - def __init__( - self, - log_root=None, - acc=None, - physical_graph_template_data=None, # JSON formatted physical graph template - logical_graph=None, - job_dur=30, - num_nodes=None, - run_proxy=False, - mon_host=default_aws_mon_host, - mon_port=default_aws_mon_port, - logv=1, - facility=None, - zerorun=False, - max_threads=0, - sleepncopy=False, - num_islands=None, - all_nics=False, - check_with_session=False, - submit=True, - pip_name=None, - ): - self._config = ConfigFactory.create_config(facility=facility) - self._acc = self._config.getpar("acc") if (acc is None) else acc - self._log_root = ( - self._config.getpar("log_root") if (log_root is None) else log_root - ) - self.modules = self._config.getpar("modules") - self._num_nodes = num_nodes - self._job_dur = job_dur - self._logical_graph = logical_graph - self._physical_graph_template_data = physical_graph_template_data - self._visualise_graph = False - self._run_proxy = run_proxy - self._mon_host = mon_host - self._mon_port = mon_port - self._pip_name = pip_name - self._logv = logv - self._zerorun = zerorun - self._max_threads = max_threads - self._sleepncopy = sleepncopy - self._num_islands = num_islands - self._all_nics = all_nics - self._check_with_session = check_with_session - self._submit = submit - self._dtstr = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") # .%f - self._set_name_and_nodenumber() - - def _set_name_and_nodenumber(self): - """ - Given the physical graph data extract the graph name and the total number of - nodes. We are not making a decision whether the island managers are running - on separate nodes here, thus the number is the sum of all island - managers and node managers. The values are only populated if not given on the - init already. - - TODO: We will probably need to do the same with job duration and CPU number - """ - pgt_data = json.loads(self._physical_graph_template_data) - try: - (pgt_name, pgt) = pgt_data - except: - raise ValueError(type(pgt_data)) - nodes = list(map(lambda x:x['node'], pgt)) - islands = list(map(lambda x:x['island'], pgt)) - if self._num_islands == None: - self._num_islands = len(dict(zip(islands,nodes))) - if self._num_nodes == None: - num_nodes = list(map(lambda x,y:x+y, islands, nodes)) - self._num_nodes = len(dict(zip(num_nodes, nodes))) # uniq comb. - if (self._pip_name == None): - self._pip_name = pgt_name - return - - - @property - def num_daliuge_nodes(self): - if self._run_proxy: - ret = self._num_nodes - 1 # exclude the proxy node - else: - ret = self._num_nodes - 0 # exclude the data island node? - if ret <= 0: - raise Exception( - "Not enough nodes {0} to run DALiuGE.".format(self._num_nodes) - ) - return ret - - def get_log_dirname(self): - """ - (pipeline name_)[Nnum_of_daliuge_nodes]_[time_stamp] - """ - # Moved setting of dtstr to init to ensure it doesn't change for this instance of SlurmClient() - #dtstr = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") # .%f - graph_name = self._pip_name.split('_')[0] # use only the part of the graph name - return "{0}_{1}".format(graph_name, self._dtstr) - - def label_job_dur(self): - """ - e.g. 135 min --> 02:15:00 - """ - seconds = self._job_dur * 60 - m, s = divmod(seconds, 60) - h, m = divmod(m, 60) - return "%02d:%02d:%02d" % (h, m, s) - - def create_job_desc(self, physical_graph_file): - log_dir = "{0}/{1}".format(self._log_root, self.get_log_dirname()) - pardict = dict() - pardict["NUM_NODES"] = str(self._num_nodes) - pardict["PIP_NAME"] = self._pip_name - pardict["SESSION_ID"] = os.path.split(log_dir)[-1] - pardict["JOB_DURATION"] = self.label_job_dur() - pardict["ACCOUNT"] = self._acc - pardict["PY_BIN"] = sys.executable - pardict["LOG_DIR"] = log_dir - pardict["GRAPH_PAR"] = ( - '-L "{0}"'.format(self._logical_graph) - if self._logical_graph - else '-P "{0}"'.format(physical_graph_file) - if physical_graph_file - else "" - ) - pardict["PROXY_PAR"] = ( - "-m %s -o %d" % (self._mon_host, self._mon_port) if self._run_proxy else "" - ) - pardict["GRAPH_VIS_PAR"] = "-d" if self._visualise_graph else "" - pardict["LOGV_PAR"] = "-v %d" % self._logv - pardict["ZERORUN_PAR"] = "-z" if self._zerorun else "" - pardict["MAXTHREADS_PAR"] = "-t %d" % (self._max_threads) - pardict["SNC_PAR"] = "--app 1" if self._sleepncopy else "--app 0" - pardict["NUM_ISLANDS_PAR"] = "-s %d" % (self._num_islands) - pardict["ALL_NICS"] = "-u" if self._all_nics else "" - pardict["CHECK_WITH_SESSION"] = "-S" if self._check_with_session else "" - pardict["MODULES"] = self.modules - - job_desc = init_tpl.safe_substitute(pardict) - return job_desc - - - def submit_job(self): - log_dir = "{0}/{1}".format(self._log_root, self.get_log_dirname()) - if not os.path.exists(log_dir): - os.makedirs(log_dir) - - physical_graph_file = "{0}/{1}".format(log_dir, - self._pip_name) - with open(physical_graph_file, 'w') as pf: - pf.write(self._physical_graph_template_data) - pf.close() - - job_file = "{0}/jobsub.sh".format(log_dir) - job_desc = self.create_job_desc(physical_graph_file) - with open(job_file, "w") as jf: - jf.write(job_desc) - - with open(os.path.join(log_dir, "git_commit.txt"), "w") as gf: - gf.write(git_commit) - if self._submit: - os.chdir(log_dir) # so that slurm logs will be dumped here - print(subprocess.check_output(["sbatch", job_file])) - else: - print(f"Created job submission script {job_file}") - -class LogEntryPair(object): - """ """ +class LogEntryPair: + """ + Generates log entries + """ def __init__(self, name, gstart, gend): self._name = name - self._gstart = ( - gstart + 2 - ) # group 0 is the whole matching line, group 1 is the catchall + self._gstart = (gstart + 2) # group 0 is the whole matching line, group 1 is the catchall self._gend = gend + 2 self._start_time = None self._end_time = None - self._other = dict() # hack - - def get_timestamp(self, line): - """ - microsecond precision - """ - sp = line.split() - date_time = "{0}T{1}".format(sp[0], sp[1]) - pattern = "%Y-%m-%dT%H:%M:%S,%f" - epoch = time.mktime(time.strptime(date_time, pattern)) - return datetime.datetime.strptime(date_time, pattern).microsecond / 1e6 + epoch + self._other = {} def check_start(self, match, line): if self._start_time is None and match.group(self._gstart): - self._start_time = self.get_timestamp(line) + self._start_time = get_timestamp(line) def check_end(self, match, line): if self._end_time is None and match.group(self._gend): - self._end_time = self.get_timestamp(line) + self._end_time = get_timestamp(line) if self._name == "unroll": self._other["num_drops"] = int(line.split()[-1]) elif self._name == "node managers": @@ -267,7 +84,8 @@ def check_end(self, match, line): def get_duration(self): if (self._start_time is None) or (self._end_time is None): - # print "Cannot calc duration for '{0}': start_time:{1}, end_time:{2}".format(self._name, + # print "Cannot calc duration for + # '{0}': start_time:{1}, end_time:{2}".format(self._name, # self._start_time, self._end_time) return None return self._end_time - self._start_time @@ -276,8 +94,55 @@ def reset(self): self._start_time = None self._end_time = None + @property + def name(self): + return self._name + + @property + def other(self): + return self._other + + +def build_dim_log_entry_pairs(): + return [ + LogEntryPair(name, g1, g2) + for name, g1, g2 in ( + ("unroll", 0, 1), + ("translate", 2, 3), + ("gen pg spec", 3, 4), + ("create session", 5, 6), + ("separate graph", 7, 8), + ("add session to all", 9, 10), + ("deploy session to all", 11, 12), + ("build drop connections", 13, 14), + ("trigger drops", 15, 16), + ("node managers", 17, 17), + ) + ] + + +def build_nm_log_entry_pairs(): + return [ + LogEntryPair(name, g1, g2) + for name, g1, g2 in ( + ("completion_time_old", 0, 3), # Old master branch + ("completion_time", 2, 3), + ("node_deploy_time", 1, 2), + ) + ] + + +def construct_catchall_pattern(node_type): + pattern_strs = LogParser.kwords.get(node_type) + patterns = [ + x.format(".*").replace("(", r"\(").replace(")", r"\)") for x in pattern_strs + ] + catchall = "|".join(["(%s)" % (s,) for s in patterns]) + catchall = ".*(%s).*" % (catchall,) + return re.compile(catchall) + -class LogParser(object): +class LogParser: """ TODO: This needs adjustment to new log directory names!! @@ -345,90 +210,54 @@ def __init__(self, log_dir): if not self.check_log_dir(log_dir): raise Exception("No DIM log found at: {0}".format(log_dir)) self._log_dir = log_dir - self._dim_catchall_pattern = self.construct_catchall_pattern(node_type="dim") - self._nm_catchall_pattern = self.construct_catchall_pattern(node_type="nm") - - def build_dim_log_entry_pairs(self): - return [ - LogEntryPair(name, g1, g2) - for name, g1, g2 in ( - ("unroll", 0, 1), - ("translate", 2, 3), - ("gen pg spec", 3, 4), - ("create session", 5, 6), - ("separate graph", 7, 8), - ("add session to all", 9, 10), - ("deploy session to all", 11, 12), - ("build drop connections", 13, 14), - ("trigger drops", 15, 16), - ("node managers", 17, 17), - ) - ] - - def build_nm_log_entry_pairs(self): - return [ - LogEntryPair(name, g1, g2) - for name, g1, g2 in ( - ("completion_time_old", 0, 3), # Old master branch - ("completion_time", 2, 3), - ("node_deploy_time", 1, 2), - ) - ] - - def construct_catchall_pattern(self, node_type): - pattern_strs = LogParser.kwords.get(node_type) - patterns = [ - x.format(".*").replace("(", r"\(").replace(")", r"\)") for x in pattern_strs - ] - catchall = "|".join(["(%s)" % (s,) for s in patterns]) - catchall = ".*(%s).*" % (catchall,) - return re.compile(catchall) + self._dim_catchall_pattern = construct_catchall_pattern(node_type="dim") + self._nm_catchall_pattern = construct_catchall_pattern(node_type="nm") def parse(self, out_csv=None): """ e.g. lofar_std_N4_2016-08-22T11-52-11 """ logb_name = os.path.basename(self._log_dir) - ss = re.search("_N[0-9]+_", logb_name) - if ss is None: + search_string = re.search("_N[0-9]+_", logb_name) + if search_string is None: raise Exception("Invalid log directory: {0}".format(self._log_dir)) - delimit = ss.group(0) - sp = logb_name.split(delimit) - pip_name = sp[0] - do_date = sp[1] + delimit = search_string.group(0) + split = logb_name.split(delimit) + pip_name = split[0] + do_date = split[1] num_nodes = int(delimit.split("_")[1][1:]) user_name = pwd.getpwuid(os.stat(self._dim_log_f[0]).st_uid).pw_name gitf = os.path.join(self._log_dir, "git_commit.txt") if os.path.exists(gitf): - with open(gitf, "r") as gf: - git_commit = gf.readline().strip() + with open(gitf, "r") as git_file: + git_commit = git_file.readline().strip() else: git_commit = "None" # parse DIM log - dim_log_pairs = self.build_dim_log_entry_pairs() + dim_log_pairs = build_dim_log_entry_pairs() for lff in self._dim_log_f: with open(lff, "r") as dimlog: for line in dimlog: - m = self._dim_catchall_pattern.match(line) - if not m: + matches = self._dim_catchall_pattern.match(line) + if not matches: continue for lep in dim_log_pairs: - lep.check_start(m, line) - lep.check_end(m, line) + lep.check_start(matches, line) + lep.check_end(matches, line) num_drops = -1 temp_dim = [] num_node_mgrs = 0 for lep in dim_log_pairs: add_dur = True - if "unroll" == lep._name: - num_drops = lep._other.get("num_drops", -1) - elif "node managers" == lep._name: - num_node_mgrs = lep._other.get("num_node_mgrs", 0) + if lep.name == "unroll": + num_drops = lep.other.get("num_drops", -1) + elif lep.name == "node managers": + num_node_mgrs = lep.other.get("num_node_mgrs", 0) add_dur = False - elif "build drop connections" == lep._name: - num_edges = lep._other.get("num_edges", -1) + elif lep.name == "build drop connections": + num_edges = lep.other.get("num_edges", -1) temp_dim.append(str(num_edges)) if add_dur: temp_dim.append(str(lep.get_duration())) @@ -439,32 +268,32 @@ def parse(self, out_csv=None): num_finished_sess = 0 num_dims = 0 - for df in os.listdir(self._log_dir): + for log_directory_file_name in os.listdir(self._log_dir): # Check this is a dir and contains the NM log - if not os.path.isdir(os.path.join(self._log_dir, df)): + if not os.path.isdir(os.path.join(self._log_dir, log_directory_file_name)): continue - nm_logf = os.path.join(self._log_dir, df, "dlgNM.log") - nm_dim_logf = os.path.join(self._log_dir, df, "dlgDIM.log") - nm_mm_logf = os.path.join(self._log_dir, df, "dlgMM.log") + nm_logf = os.path.join(self._log_dir, log_directory_file_name, "dlgNM.log") + nm_dim_logf = os.path.join(self._log_dir, log_directory_file_name, "dlgDIM.log") + nm_mm_logf = os.path.join(self._log_dir, log_directory_file_name, "dlgMM.log") if not os.path.exists(nm_logf): if os.path.exists(nm_dim_logf) or os.path.exists(nm_mm_logf): num_dims += 1 continue # Start anew every time - nm_log_pairs = self.build_nm_log_entry_pairs() + nm_log_pairs = build_nm_log_entry_pairs() nm_logs.append(nm_log_pairs) # Read NM log and fill all LogPair objects with open(nm_logf, "r") as nmlog: for line in nmlog: - m = self._nm_catchall_pattern.match(line) - if not m: + matches = self._nm_catchall_pattern.match(line) + if not matches: continue for lep in nm_log_pairs: - lep.check_start(m, line) - lep.check_end(m, line) + lep.check_start(matches, line) + lep.check_end(matches, line) # Looking for the deployment times and counting for finished sessions for lep in nm_log_pairs: @@ -474,9 +303,9 @@ def parse(self, out_csv=None): if dur is None: continue - if lep._name in ("completion_time", "completion_time_old"): + if lep.name in ("completion_time", "completion_time_old"): num_finished_sess += 1 - elif lep._name == "node_deploy_time": + elif lep.name == "node_deploy_time": if dur > max_node_deploy_time: max_node_deploy_time = dur @@ -505,14 +334,13 @@ def parse(self, out_csv=None): max_exec_time = 0 for log_entry_pairs in nm_logs: - indexed_leps = {lep._name: lep for lep in log_entry_pairs} + indexed_leps = {lep.name: lep for lep in log_entry_pairs} deploy_time = indexed_leps["node_deploy_time"].get_duration() if deploy_time is None: # since some node managers failed to start continue - exec_time = ( - indexed_leps["completion_time"].get_duration() - or indexed_leps["completion_time_old"].get_duration() - ) + exec_time = (indexed_leps["completion_time"].get_duration() + or indexed_leps["completion_time_old"].get_duration() + ) if exec_time is None: continue real_exec_time = exec_time - (max_node_deploy_time - deploy_time) @@ -534,9 +362,9 @@ def parse(self, out_csv=None): num_dims = num_dims if num_dims == 1 else num_dims - 1 # exclude master manager add_line = ",".join(ret + temp_dim + temp_nm + [str(int(num_dims))]) if out_csv is not None: - with open(out_csv, "a") as of: - of.write(add_line) - of.write(os.linesep) + with open(out_csv, "a") as out_file: + out_file.write(add_line) + out_file.write(os.linesep) else: print(add_line) @@ -556,8 +384,9 @@ def check_log_dir(self, log_dir): return False -if __name__ == "__main__": - parser = optparse.OptionParser(usage='\n%prog -a [1|2] -f [options]\n\n%prog -h for further help') +def main(): + parser = optparse.OptionParser( + usage='\n%prog -a [1|2] -f [options]\n\n%prog -h for further help') parser.add_option( "-a", @@ -642,7 +471,7 @@ def check_log_dir(self, log_dir): type="string", dest="mon_host", help="Monitor host IP (optional)", - default=default_aws_mon_host, + default=DEFAULT_AWS_MON_HOST, ) parser.add_option( "-o", @@ -651,7 +480,7 @@ def check_log_dir(self, log_dir): type="int", dest="mon_port", help="The port to bind DALiuGE monitor", - default=default_aws_mon_port, + default=DEFAULT_AWS_MON_PORT, ) parser.add_option( "-v", @@ -731,9 +560,9 @@ def check_log_dir(self, log_dir): "-f", "--facility", dest="facility", - choices=facilities, + choices=FACILITIES, action="store", - help=f"The facility for which to create a submission job\nValid options: {facilities}", + help=f"The facility for which to create a submission job\nValid options: {FACILITIES}", default=None, ) parser.add_option( @@ -744,12 +573,12 @@ def check_log_dir(self, log_dir): default=True, ) - (opts, args) = parser.parse_args(sys.argv) + (opts, _) = parser.parse_args(sys.argv) if not (opts.action and opts.facility) and not opts.configs: parser.error("Missing required parameters!") - if opts.facility not in facilities: - parser.error(f"Unknown facility provided. Please choose from {facilities}") - + if opts.facility not in FACILITIES: + parser.error(f"Unknown facility provided. Please choose from {FACILITIES}") + if opts.action == 2: if opts.log_dir is None: # you can specify: @@ -765,14 +594,14 @@ def check_log_dir(self, log_dir): ) # or a root log directory else: - for df in os.listdir(log_root): - df = os.path.join(log_root, df) - if os.path.isdir(df): + for log_dir in os.listdir(log_root): + log_dir = os.path.join(log_root, log_dir) + if os.path.isdir(log_dir): try: - log_parser = LogParser(df) + log_parser = LogParser(log_dir) log_parser.parse(out_csv=opts.csv_output) except Exception as exp: - print("Fail to parse {0}: {1}".format(df, exp)) + print("Fail to parse {0}: {1}".format(log_dir, exp)) else: log_parser = LogParser(opts.log_dir) log_parser.parse(out_csv=opts.csv_output) @@ -786,7 +615,7 @@ def check_log_dir(self, log_dir): if path_to_graph_file and not os.path.exists(path_to_graph_file): parser.error("Cannot locate graph file at '{0}'".format(path_to_graph_file)) - pc = SlurmClient( + client = SlurmClient( facility=opts.facility, job_dur=opts.job_dur, num_nodes=opts.num_nodes, @@ -801,12 +630,16 @@ def check_log_dir(self, log_dir): check_with_session=opts.check_with_session, logical_graph=opts.logical_graph, physical_graph=opts.physical_graph, - submit=True if opts.submit in ['True','true'] else False, + submit=opts.submit in ['True', 'true'], ) - pc._visualise_graph = opts.visualise_graph - pc.submit_job() - elif opts.configs == True: - print(f"Available facilities: {facilities}") + client._visualise_graph = opts.visualise_graph + client.submit_job() + elif opts.configs: + print(f"Available facilities: {FACILITIES}") else: parser.print_help() parser.error("Invalid input!") + + +if __name__ == "__main__": + main() diff --git a/daliuge-engine/dlg/deploy/deployment_constants.py b/daliuge-engine/dlg/deploy/deployment_constants.py new file mode 100644 index 000000000..c35fd91fe --- /dev/null +++ b/daliuge-engine/dlg/deploy/deployment_constants.py @@ -0,0 +1,27 @@ +# +# ICRAR - International Centre for Radio Astronomy Research +# (c) UWA - The University of Western Australia, 2016 +# Copyright by UWA (in the framework of the ICRAR) +# All rights reserved +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +""" +Contains deployment constants, that could be changed to easily re-configure deployment. +""" + +DEFAULT_AWS_MON_HOST = "sdp-dfms.ddns.net" # TODO: need to change this +DEFAULT_AWS_MON_PORT = 8898 diff --git a/daliuge-engine/dlg/deploy/slurm_utils.py b/daliuge-engine/dlg/deploy/deployment_utils.py similarity index 69% rename from daliuge-engine/dlg/deploy/slurm_utils.py rename to daliuge-engine/dlg/deploy/deployment_utils.py index d753e87ca..657c44868 100644 --- a/daliuge-engine/dlg/deploy/slurm_utils.py +++ b/daliuge-engine/dlg/deploy/deployment_utils.py @@ -19,6 +19,7 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, # MA 02111-1307 USA # +import json class ListTokens(object): @@ -101,3 +102,51 @@ def finish_element(sub_values, range_start): def list_as_string(s): """'a008,b[072-073,076]' --> ['a008', 'b072', 'b073', 'b076']""" return _parse_list_tokens(iter(_list_tokenizer(s))) + + +def find_numislands(physical_graph_template_file): + """ + Given the physical graph data extract the graph name and the total number of + nodes. We are not making a decision whether the island managers are running + on separate nodes here, thus the number is the sum of all island + managers and node managers. The values are only populated if not given on the + init already. + TODO: We will probably need to do the same with job duration and CPU number + """ + + pgt_data = json.loads(physical_graph_template_file) + try: + (pgt_name, pgt) = pgt_data + except: + raise ValueError(type(pgt_data)) + nodes = list(map(lambda x: x['node'], pgt)) + islands = list(map(lambda x: x['island'], pgt)) + num_islands = len(dict(zip(islands, nodes))) + num_nodes = list(map(lambda x, y: x + y, islands, nodes)) + pip_name = pgt_name + return num_islands, num_nodes, pip_name + + +def label_job_dur(job_dur): + """ + e.g. 135 min --> 02:15:00 + """ + seconds = job_dur * 60 + minute, sec = divmod(seconds, 60) + hour, minute = divmod(minute, 60) + return "%02d:%02d:%02d" % (hour, minute, sec) + + +def num_daliuge_nodes(num_nodes: int, run_proxy: bool): + """ + Returns the number of daliuge nodes available to run workflow + """ + if run_proxy: + ret = num_nodes - 1 # exclude the proxy node + else: + ret = num_nodes - 0 # exclude the data island node? + if ret <= 0: + raise Exception( + "Not enough nodes {0} to run DALiuGE.".format(num_nodes) + ) + return ret diff --git a/daliuge-engine/dlg/deploy/helm_client.py b/daliuge-engine/dlg/deploy/helm_client.py new file mode 100644 index 000000000..7e8b57d4a --- /dev/null +++ b/daliuge-engine/dlg/deploy/helm_client.py @@ -0,0 +1,180 @@ +# +# ICRAR - International Centre for Radio Astronomy Research +# (c) UWA - The University of Western Australia, 2016 +# Copyright by UWA (in the framework of the ICRAR) +# All rights reserved +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +""" +Contains a module translating physical graphs to kubernetes helm charts. +""" +import json +import re +import time +import os +import sys +import shutil +import pathlib + +import dlg +import yaml +import subprocess +from dlg.common.version import version as dlg_version +from dlg.restutils import RestClient +from dlg.deploy.common import submit + + +def _write_chart(chart_dir, name: str, chart_name: str, version: str, app_version: str, home: str, + description, keywords: list, sources: list, kubeVersion: str): + chart_info = {'apiVersion': "v2", 'name': chart_name, 'type': 'application', 'version': version, + 'appVersion': app_version, 'home': home, 'description': description, + 'keywords': keywords, 'sources': sources, 'kubeVersion': kubeVersion} + # TODO: Fix app_version quotations. + with open(f'{chart_dir}{os.sep}{name}', 'w', encoding='utf-8') as chart_file: + yaml.dump(chart_info, chart_file) + + +def _write_values(chart_dir, config): + with open(f"{chart_dir}{os.sep}custom-values.yaml", 'w', encoding='utf-8') as value_file: + yaml.dump(config, value_file) + + +def _read_values(chart_dir): + with open(f"{chart_dir}{os.sep}values.yaml", 'r', encoding='utf-8') as old_file: + data = yaml.safe_load(old_file) + with open(f"{chart_dir}{os.sep}values.yaml", 'r', encoding='utf-8') as custom_file: + new_data = yaml.safe_load(custom_file) + data.update(new_data) + return data + + +def _find_resources(pgt_data): + pgt = json.loads(pgt_data) + nodes = list(map(lambda x: x['node'], pgt)) + islands = list(map(lambda x: x['island'], pgt)) + num_islands = len(dict(zip(islands, nodes))) + num_nodes = len(nodes) + return num_islands, num_nodes + + +class HelmClient: + """ + Writes necessary files to launch job with kubernetes. + """ + + def __init__(self, deploy_name, chart_name="daliuge-daemon", deploy_dir="./", + submit=True, chart_version="0.1.0", + value_config=None, physical_graph_file=None, chart_vars=None): + if value_config is None: + value_config = dict() + self._chart_name = chart_name + self._chart_vars = {'name': 'daliuge-daemon', + 'appVersion': 'v1.0.0', + 'home': 'https://github.com/ICRAR/daliuge/daliuge-k8s', + 'description': 'DALiuGE k8s deployment', + 'keywords': ['daliuge', 'workflow'], + 'sources': ['https://github.com/ICRAR/daliuge/daliuge-k8s'], + 'kubeVersion': ">=1.10.0-0" + } + if chart_vars is not None: + self._chart_vars.update(chart_vars) + self._deploy_dir = deploy_dir + self._chart_dir = os.path.join(self._deploy_dir, 'daliuge-daemon') + self._chart_version = chart_version + self._deploy_name = deploy_name + self._submit = submit + self._value_data = value_config if value_config is not None else {} + self._submission_endpoint = None + if physical_graph_file is not None: + self._set_physical_graph(physical_graph_file) + + # Copy in template files. + library_root = pathlib.Path(os.path.dirname(dlg.__file__)).parent.parent + print(library_root) + if sys.version_info >= (3, 8): + shutil.copytree(os.path.join(library_root, 'daliuge-k8s', 'helm'), self._deploy_dir, + dirs_exist_ok=True) + else: + shutil.copytree(os.path.join(library_root, 'daliuge-k8s', 'helm'), self._deploy_dir) + + def _set_physical_graph(self, physical_graph_content): + self._physical_graph_file = physical_graph_content + self._num_islands, self._num_nodes = _find_resources( + self._physical_graph_file) + + def create_helm_chart(self, physical_graph_content): + """ + Translates a physical graph to a kubernetes helm chart. + For now, it will just try to run everything in a single container. + """ + _write_chart(self._chart_dir, 'Chart.yaml', self._chart_name, self._chart_version, + dlg_version, + self._chart_vars['home'], self._chart_vars['description'], + self._chart_vars['keywords'], self._chart_vars['sources'], + self._chart_vars['kubeVersion']) + # Update values.yaml + _write_values(self._chart_dir, self._value_data) + self._value_data = _read_values(self._chart_dir) + # Add charts + # TODO: Add charts to helm + self._set_physical_graph(physical_graph_content) + # Update template + # TODO: Update templates in helm + + def launch_helm(self): + """ + Launches the built helm chart using the most straightforward commands possible. + Assumes all files are prepared and validated. + """ + if self._submit: + os.chdir(self._deploy_dir) + instruction = f'helm install {self._deploy_name} {self._chart_name}/ ' \ + f'--values {self._chart_name}{os.sep}custom-values.yaml' + print(subprocess.check_output([instruction], + shell=True).decode('utf-8')) + query = str(subprocess.check_output(['kubectl get svc -o wide'], shell=True)) + # WARNING: May be problematic later if multiple services are running + pattern = r"-service\s*ClusterIP\s*\d+\.\d+\.\d+\.\d+" + ip_pattern = r"\d+\.\d+\.\d+\.\d+" + outcome = re.search(pattern, query) + if outcome: + manager_ip = re.search(ip_pattern, outcome.string) + self._submission_endpoint = manager_ip.group(0) + client = RestClient(self._submission_endpoint, + self._value_data['service']['daemon']['port']) + data = json.dumps({'nodes': ["127.0.0.1"]}).encode('utf-8') + time.sleep(5) # TODO: Deterministic deployment information + client._POST('/managers/island/start', content=data, + content_type='application/json') + client._POST('/managers/master/start', content=data, + content_type='application/json') + else: + print("Could not find manager IP address") + + else: + print(f"Created helm chart {self._chart_name} in {self._deploy_dir}") + + def teardown(self): + subprocess.check_output(['helm uninstall daliuge-daemon'], shell=True) + + def submit_job(self): + """ + There is a semi-dynamic element to fetching the IPs of Node(s) to deploy to. + Hence, launching the chart and initiating graph execution have been de-coupled. + """ + pg_data = json.loads(self._physical_graph_file) + submit(pg_data, self._submission_endpoint) diff --git a/daliuge-engine/dlg/deploy/remotes.py b/daliuge-engine/dlg/deploy/remotes.py index f885d30fa..38245cd57 100644 --- a/daliuge-engine/dlg/deploy/remotes.py +++ b/daliuge-engine/dlg/deploy/remotes.py @@ -27,7 +27,7 @@ import re import socket -from . import slurm_utils +from . import deployment_utils logger = logging.getLogger(__name__) @@ -177,7 +177,7 @@ def __init__(self, options, my_ip): self._set_world( int(os.environ["SLURM_PROCID"]), int(os.environ["SLURM_NTASKS"]), - slurm_utils.list_as_string(os.environ["SLURM_NODELIST"]), + deployment_utils.list_as_string(os.environ["SLURM_NODELIST"]), ) diff --git a/daliuge-engine/dlg/deploy/slurm_client.py b/daliuge-engine/dlg/deploy/slurm_client.py new file mode 100644 index 000000000..08f83971c --- /dev/null +++ b/daliuge-engine/dlg/deploy/slurm_client.py @@ -0,0 +1,171 @@ +# +# ICRAR - International Centre for Radio Astronomy Research +# (c) UWA - The University of Western Australia, 2016 +# Copyright by UWA (in the framework of the ICRAR) +# All rights reserved +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +""" +Contains a slurm client which generates slurm scripts from daliuge graphs. +""" + +import datetime +import sys +import os +import subprocess +from dlg.runtime import __git_version__ as git_commit + +from dlg.deploy.configs import ConfigFactory, init_tpl +from deployment_constants import DEFAULT_AWS_MON_PORT, DEFAULT_AWS_MON_HOST +from deployment_utils import find_numislands, label_job_dur + + +class SlurmClient: + """ + parameters we can control: + + 1. user group / account name (Required) + 2. whether to submit a graph, and if so provide graph path + 3. # of nodes (of Drop Managers) + 4. how long to run + 5. whether to produce offline graph vis + 6. whether to attach proxy for remote monitoring, and if so provide + DLG_MON_HOST + DLG_MON_PORT + 7. Root directory of the Log files (Required) + """ + + def __init__( + self, + log_root=None, + acc=None, + physical_graph_template_data=None, # JSON formatted physical graph template + logical_graph=None, + job_dur=30, + num_nodes=None, + run_proxy=False, + mon_host=DEFAULT_AWS_MON_HOST, + mon_port=DEFAULT_AWS_MON_PORT, + logv=1, + facility=None, + zerorun=False, + max_threads=0, + sleepncopy=False, + num_islands=None, + all_nics=False, + check_with_session=False, + submit=True, + pip_name=None, + ): + self._config = ConfigFactory.create_config(facility=facility) + self._acc = self._config.getpar("acc") if (acc is None) else acc + self._log_root = ( + self._config.getpar("log_root") if (log_root is None) else log_root + ) + self.modules = self._config.getpar("modules") + self._num_nodes = num_nodes + self._job_dur = job_dur + self._logical_graph = logical_graph + self._physical_graph_template_data = physical_graph_template_data + self._visualise_graph = False + self._run_proxy = run_proxy + self._mon_host = mon_host + self._mon_port = mon_port + self._pip_name = pip_name + self._logv = logv + self._zerorun = zerorun + self._max_threads = max_threads + self._sleepncopy = sleepncopy + self._num_islands = num_islands + self._all_nics = all_nics + self._check_with_session = check_with_session + self._submit = submit + self._dtstr = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") # .%f + self._num_islands, self._num_nodes, self._pip_name = find_numislands( + self._physical_graph_template_data) + + def get_log_dirname(self): + """ + (pipeline name_)[Nnum_of_daliuge_nodes]_[time_stamp] + """ + # Moved setting of dtstr to init + # to ensure it doesn't change for this instance of SlurmClient() + # dtstr = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") # .%f + graph_name = self._pip_name.split('_')[0] # use only the part of the graph name + return "{0}_{1}".format(graph_name, self._dtstr) + + def create_job_desc(self, physical_graph_file): + """ + Creates the slurm script from a physical graph + """ + log_dir = "{0}/{1}".format(self._log_root, self.get_log_dirname()) + pardict = dict() + pardict["NUM_NODES"] = str(self._num_nodes) + pardict["PIP_NAME"] = self._pip_name + pardict["SESSION_ID"] = os.path.split(log_dir)[-1] + pardict["JOB_DURATION"] = label_job_dur(self._job_dur) + pardict["ACCOUNT"] = self._acc + pardict["PY_BIN"] = sys.executable + pardict["LOG_DIR"] = log_dir + pardict["GRAPH_PAR"] = ( + '-L "{0}"'.format(self._logical_graph) + if self._logical_graph + else '-P "{0}"'.format(physical_graph_file) + if physical_graph_file + else "" + ) + pardict["PROXY_PAR"] = ( + "-m %s -o %d" % (self._mon_host, self._mon_port) if self._run_proxy else "" + ) + pardict["GRAPH_VIS_PAR"] = "-d" if self._visualise_graph else "" + pardict["LOGV_PAR"] = "-v %d" % self._logv + pardict["ZERORUN_PAR"] = "-z" if self._zerorun else "" + pardict["MAXTHREADS_PAR"] = "-t %d" % self._max_threads + pardict["SNC_PAR"] = "--app 1" if self._sleepncopy else "--app 0" + pardict["NUM_ISLANDS_PAR"] = "-s %d" % self._num_islands + pardict["ALL_NICS"] = "-u" if self._all_nics else "" + pardict["CHECK_WITH_SESSION"] = "-S" if self._check_with_session else "" + pardict["MODULES"] = self.modules + + job_desc = init_tpl.safe_substitute(pardict) + return job_desc + + def submit_job(self): + """ + Submits the slurm script to the cluster + """ + log_dir = "{0}/{1}".format(self._log_root, self.get_log_dirname()) + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + physical_graph_file_name = "{0}/{1}".format(log_dir, self._pip_name) + with open(physical_graph_file_name, 'w') as physical_graph_file: + physical_graph_file.write(self._physical_graph_template_data) + physical_graph_file.close() + + job_file_name = "{0}/jobsub.sh".format(log_dir) + job_desc = self.create_job_desc(physical_graph_file_name) + with open(job_file_name, "w") as job_file: + job_file.write(job_desc) + + with open(os.path.join(log_dir, "git_commit.txt"), "w") as git_file: + git_file.write(git_commit) + if self._submit: + os.chdir(log_dir) # so that slurm logs will be dumped here + print(subprocess.check_output(["sbatch", job_file_name])) + else: + print(f"Created job submission script {job_file_name}") diff --git a/daliuge-engine/dlg/deploy/start_dlg_cluster.py b/daliuge-engine/dlg/deploy/start_dlg_cluster.py index 193e8d30b..9ada67b0e 100644 --- a/daliuge-engine/dlg/deploy/start_dlg_cluster.py +++ b/daliuge-engine/dlg/deploy/start_dlg_cluster.py @@ -55,14 +55,13 @@ MASTER_DEFAULT_REST_PORT, ) - DIM_WAIT_TIME = 60 MM_WAIT_TIME = DIM_WAIT_TIME GRAPH_SUBMIT_WAIT_TIME = 10 GRAPH_MONITOR_INTERVAL = 5 VERBOSITY = "5" -logger = logging.getLogger("deploy.dlg.cluster") -apps = ( +LOGGER = logging.getLogger("deploy.dlg.cluster") +APPS = ( None, "test.graphsRepository.SleepApp", "test.graphsRepository.SleepAndCopyApp", @@ -81,9 +80,9 @@ def check_host(host, port, timeout=5, check_with_session=False): try: session_id = str(uuid.uuid4()) - with NodeManagerClient(host, port, timeout=timeout) as c: - c.create_session(session_id) - c.destroy_session(session_id) + with NodeManagerClient(host, port, timeout=timeout) as client: + client.create_session(session_id) + client.destroy_session(session_id) return True except: return False @@ -95,31 +94,31 @@ def check_hosts(ips, port, timeout=None, check_with_session=False, retry=1): given timeout, and returns the list of IPs that were found to be up. """ - def check_and_add(ip): + def check_and_add(ip_addr): ntries = retry while ntries: if check_host( - ip, port, timeout=timeout, check_with_session=check_with_session + ip_addr, port, timeout=timeout, check_with_session=check_with_session ): - logger.info("Host %s:%d is running", ip, port) - return ip - logger.warning("Failed to contact host %s:%d", ip, port) + LOGGER.info("Host %s:%d is running", ip_addr, port) + return ip_addr + LOGGER.warning("Failed to contact host %s:%d", ip_addr, port) ntries -= 1 return None # Don't return None values - tp = multiprocessing.pool.ThreadPool(min(50, len(ips))) - up = tp.map(check_and_add, ips) - tp.close() - tp.join() + thread_pool = multiprocessing.pool.ThreadPool(min(50, len(ips))) + result_pool = thread_pool.map(check_and_add, ips) + thread_pool.close() + thread_pool.join() - return [ip for ip in up if ip] + return [ip for ip in result_pool if ip] def get_ip_via_ifconfig(iface_index): out = subprocess.check_output("ifconfig") ifaces_info = list(filter(None, out.split(b"\n\n"))) - logger.info("Found %d interfaces, getting %d", len(ifaces_info), iface_index) + LOGGER.info("Found %d interfaces, getting %d", len(ifaces_info), iface_index) for line in ifaces_info[iface_index].splitlines(): line = line.strip() if line.startswith(b"inet"): @@ -130,27 +129,29 @@ def get_ip_via_ifconfig(iface_index): def get_ip_via_netifaces(iface_index): return utils.get_local_ip_addr()[iface_index][0] + def get_workspace_dir(log_dir): """ Common workspace dir for all nodes just underneath main session directory """ - return(f"{os.path.split(log_dir)[0]}/workspace") + return f"{os.path.split(log_dir)[0]}/workspace" + def start_node_mgr( - log_dir, my_ip, logv=1, max_threads=0, host=None, event_listeners="" + log_dir, my_ip, logv=1, max_threads=0, host=None, event_listeners="" ): """ Start node manager """ - logger.info("Starting node manager on host %s", my_ip) + LOGGER.info("Starting node manager on host %s", my_ip) host = host or "0.0.0.0" - lv = "v" * logv + log_level = "v" * logv args = [ "-l", log_dir, "-w", get_workspace_dir(log_dir), - "-%s" % lv, + "-%s" % log_level, "-H", host, "-m", @@ -163,25 +164,24 @@ def start_node_mgr( ] # return cmdline.dlgNM(optparse.OptionParser(), args) proc = tool.start_process("nm", args) - logger.info("Node manager process started with pid %d", proc.pid) + LOGGER.info("Node manager process started with pid %d", proc.pid) return proc - def start_dim(node_list, log_dir, origin_ip, logv=1): """ Start data island manager """ - logger.info( + LOGGER.info( "Starting island manager on host %s for node managers %r", origin_ip, node_list ) - lv = "v" * logv + log_level = "v" * logv args = [ "-l", log_dir, "-w", get_workspace_dir(log_dir), - "-%s" % lv, + "-%s" % log_level, "-N", ",".join(node_list), "-H", @@ -190,7 +190,7 @@ def start_dim(node_list, log_dir, origin_ip, logv=1): "2048", ] proc = tool.start_process("dim", args) - logger.info("Island manager process started with pid %d", proc.pid) + LOGGER.info("Island manager process started with pid %d", proc.pid) return proc @@ -200,7 +200,7 @@ def start_mm(node_list, log_dir, logv=1): node_list: a list of node address that host DIMs """ - lv = "v" * logv + log_level = "v" * logv parser = optparse.OptionParser() args = [ "-l", @@ -209,7 +209,7 @@ def start_mm(node_list, log_dir, logv=1): get_workspace_dir(log_dir), "-N", ",".join(node_list), - "-%s" % lv, + "-%s" % log_level, "-H", "0.0.0.0", "-m", @@ -222,10 +222,10 @@ def _stop(endpoints): def _the_stop(endpoint): common.BaseDROPManagerClient(endpoint[0], endpoint[1]).stop() - tp = multiprocessing.pool.ThreadPool(min(50, len(endpoints))) - tp.map(_the_stop, endpoints) - tp.close() - tp.join() + thread_pool = multiprocessing.pool.ThreadPool(min(50, len(endpoints))) + thread_pool.map(_the_stop, endpoints) + thread_pool.close() + thread_pool.join() def stop_nms(ips): @@ -236,16 +236,17 @@ def stop_dims(ips): _stop([(ip, ISLAND_DEFAULT_REST_PORT) for ip in ips]) -def stop_mm(ip): - _stop([(ip, MASTER_DEFAULT_REST_PORT)]) +def stop_mm(ip_addr): + _stop([(ip_addr, MASTER_DEFAULT_REST_PORT)]) -def submit_and_monitor(pg, opts, port): +def submit_and_monitor(physical_graph, opts, port): def _task(): dump_path = None if opts.dump: dump_path = os.path.join(opts.log_dir, "status-monitoring.json") - session_id = common.submit(pg, host="127.0.0.1", port=port, session_id=opts.ssid) + session_id = common.submit(physical_graph, host="127.0.0.1", port=port, + session_id=opts.ssid) while True: try: common.monitor_sessions( @@ -253,11 +254,11 @@ def _task(): ) break except: - logger.exception("Monitoring failed, restarting it") + LOGGER.exception("Monitoring failed, restarting it") - t = threading.Thread(target=_task) - t.start() - return t + threads = threading.Thread(target=_task) + threads.start() + return threads def start_proxy(dlg_host, dlg_port, monitor_host, monitor_port): @@ -271,10 +272,10 @@ def start_proxy(dlg_host, dlg_port, monitor_host, monitor_port): try: server.loop() except KeyboardInterrupt: - logger.warning("Ctrl C - Stopping DALiuGE Proxy server") + LOGGER.warning("Ctrl C - Stopping DALiuGE Proxy server") sys.exit(1) except Exception: - logger.exception("DALiuGE proxy terminated unexpectedly") + LOGGER.exception("DALiuGE proxy terminated unexpectedly") sys.exit(1) @@ -290,13 +291,13 @@ def get_pg(opts, nms, dims): """Gets the Physical Graph that is eventually submitted to the cluster, if any""" if not opts.logical_graph and not opts.physical_graph: - return + return [] num_nms = len(nms) num_dims = len(dims) if opts.logical_graph: unrolled = pg_generator.unroll( - opts.logical_graph, opts.ssid, opts.zerorun, apps[opts.app] + opts.logical_graph, opts.ssid, opts.zerorun, APPS[opts.app] ) algo_params = tool.parse_partition_algo_params(opts.algo_params) pgt = pg_generator.partition( @@ -308,8 +309,8 @@ def get_pg(opts, nms, dims): ) del unrolled # quickly dispose of potentially big object else: - with open(opts.physical_graph, "rb") as f: - pgt = json.load(f) + with open(opts.physical_graph, "rb") as pg_file: + pgt = json.load(pg_file) # modify the PG as necessary for modifier in opts.pg_modifiers.split(":"): @@ -324,13 +325,13 @@ def get_pg(opts, nms, dims): timeout=MM_WAIT_TIME, retry=3, ) - pg = pg_generator.resource_map(pgt, dims + nms, num_islands=num_dims, - co_host_dim=opts.co_host_dim) + physical_graph = pg_generator.resource_map(pgt, dims + nms, num_islands=num_dims, + co_host_dim=opts.co_host_dim) graph_name = os.path.basename(opts.log_dir) graph_name = f"{graph_name.split('_')[0]}.json" # get just the graph name - with open(os.path.join(opts.log_dir, graph_name), "wt") as f: - json.dump(pg, f) - return pg + with open(os.path.join(opts.log_dir, graph_name), "wt") as pg_file: + json.dump(physical_graph, pg_file) + return physical_graph def get_ip(opts): @@ -342,16 +343,15 @@ def get_remote(opts): my_ip = get_ip(opts) if opts.remote_mechanism == "mpi": return remotes.MPIRemote(opts, my_ip) - elif opts.remote_mechanism == "dlg": + if opts.remote_mechanism == "dlg": return remotes.DALiuGERemote(opts, my_ip) - elif opts.remote_mechanism == "dlg-hybrid": + if opts.remote_mechanism == "dlg-hybrid": return remotes.DALiuGEHybridRemote(opts, my_ip) else: # == 'slurm' return remotes.SlurmRemote(opts, my_ip) def main(): - parser = optparse.OptionParser() parser.add_option( "-l", @@ -568,11 +568,11 @@ def main(): try: print("From netifaces: %s" % get_ip_via_netifaces(options.interface)) except: - logger.exception("Failed to get information via netifaces") + LOGGER.exception("Failed to get information via netifaces") try: print("From ifconfig: %s" % get_ip_via_ifconfig(options.interface)) except: - logger.exception("Failed to get information via ifconfig") + LOGGER.exception("Failed to get information via ifconfig") sys.exit(0) elif options.collect_interfaces: from mpi4py import MPI @@ -587,9 +587,9 @@ def main(): parser.error( "Either a logical graph or physical graph filename must be specified" ) - for p in (options.logical_graph, options.physical_graph): - if p and not os.path.exists(p): - parser.error("Cannot locate graph file at '{0}'".format(p)) + for graph_file_name in (options.logical_graph, options.physical_graph): + if graph_file_name and not os.path.exists(graph_file_name): + parser.error("Cannot locate graph file at '{0}'".format(graph_file_name)) if options.monitor_host is not None and options.num_islands > 1: parser.error("We do not support proxy monitor multiple islands yet") @@ -602,38 +602,30 @@ def main(): log_dir = "{0}/{1}".format(options.log_dir, remote.my_ip) os.makedirs(log_dir) logfile = log_dir + "/start_dlg_cluster.log" - FORMAT = "%(asctime)-15s [%(levelname)5.5s] [%(threadName)15.15s] %(name)s#%(funcName)s:%(lineno)s %(message)s" - logging.basicConfig(filename=logfile, level=logging.DEBUG, format=FORMAT) + log_format = "%(asctime)-15s [%(levelname)5.5s] [%(threadName)15.15s] " \ + "%(name)s#%(funcName)s:%(lineno)s %(message)s" + logging.basicConfig(filename=logfile, level=logging.DEBUG, format=log_format) - logger.info("Starting DALiuGE cluster with %d nodes", remote.size) - logger.debug("Cluster nodes: %r", remote.sorted_peers) - logger.debug("Using %s as the local IP where required", remote.my_ip) + LOGGER.info("Starting DALiuGE cluster with %d nodes", remote.size) + LOGGER.debug("Cluster nodes: %r", remote.sorted_peers) + LOGGER.debug("Using %s as the local IP where required", remote.my_ip) - envfile = os.path.join(log_dir, "env.txt") - logger.debug("Dumping process' environment to %s", envfile) - with open(envfile, "wt") as f: + envfile_name = os.path.join(log_dir, "env.txt") + LOGGER.debug("Dumping process' environment to %s", envfile_name) + with open(envfile_name, "wt") as env_file: for name, value in sorted(os.environ.items()): - f.write("%s=%s\n" % (name, value)) + env_file.write("%s=%s\n" % (name, value)) logv = max(min(3, options.verbose_level), 1) if remote.is_highest_level_manager: nodesfile = os.path.join(log_dir, "nodes.txt") - logger.debug("Dumping list of nodes to %s", nodesfile) - with open(nodesfile, "wt") as f: - f.write("\n".join(remote.sorted_peers)) + LOGGER.debug("Dumping list of nodes to %s", nodesfile) + with open(nodesfile, "wt") as env_file: + env_file.write("\n".join(remote.sorted_peers)) dim_proc = None # start the NM - if remote.is_nm: - nm_proc = start_node_mgr( - log_dir, - remote.my_ip, - logv=logv, - max_threads=options.max_threads, - host=None if options.all_nics else remote.my_ip, - event_listeners=options.event_listeners, - ) if options.num_islands == 1: if remote.is_proxy: # Wait until the Island Manager is open @@ -645,44 +637,44 @@ def main(): options.monitor_port, ) else: - logger.warning( + LOGGER.warning( "Couldn't connect to the main drop manager, proxy not started" ) else: - logger.info(f"Starting island managers on nodes: {remote.dim_ips}") + LOGGER.info(f"Starting island managers on nodes: {remote.dim_ips}") if remote.my_ip in remote.dim_ips: dim_proc = start_dim(remote.nm_ips, log_dir, remote.my_ip, logv=logv) - pg = get_pg(options, remote.nm_ips, remote.dim_ips) + physical_graph = get_pg(options, remote.nm_ips, remote.dim_ips) monitoring_thread = submit_and_monitor( - pg, options, ISLAND_DEFAULT_REST_PORT + physical_graph, options, ISLAND_DEFAULT_REST_PORT ) monitoring_thread.join() stop_dims(remote.dim_ips) stop_nms(remote.nm_ips) if dim_proc is not None: # Stop DALiuGE. - logger.info("Stopping DALiuGE island manager on rank %d", remote.rank) + LOGGER.info("Stopping DALiuGE island manager on rank %d", remote.rank) utils.terminate_or_kill(dim_proc, 5) elif remote.is_highest_level_manager: - pg = get_pg(options, remote.nm_ips, remote.dim_ips) - remote.send_dim_nodes(pg) + physical_graph = get_pg(options, remote.nm_ips, remote.dim_ips) + remote.send_dim_nodes(physical_graph) # 7. make sure all DIMs are up running dim_ips_up = check_hosts( remote.dim_ips, ISLAND_DEFAULT_REST_PORT, timeout=MM_WAIT_TIME, retry=10 ) if len(dim_ips_up) < len(remote.dim_ips): - logger.warning( + LOGGER.warning( "Not all DIMs were up and running: %d/%d", len(dim_ips_up), len(remote.dim_ips), ) monitoring_thread = submit_and_monitor( - pg, options, MASTER_DEFAULT_REST_PORT + physical_graph, options, MASTER_DEFAULT_REST_PORT ) start_mm(remote.dim_ips, log_dir, logv=logv) monitoring_thread.join() diff --git a/daliuge-engine/dlg/deploy/start_helm_cluster.py b/daliuge-engine/dlg/deploy/start_helm_cluster.py new file mode 100644 index 000000000..5634c31ac --- /dev/null +++ b/daliuge-engine/dlg/deploy/start_helm_cluster.py @@ -0,0 +1,122 @@ +# +# ICRAR - International Centre for Radio Astronomy Research +# (c) UWA - The University of Western Australia, 2022 +# Copyright by UWA (in the framework of the ICRAR) +# All rights reserved +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +""" +A demo implementation of a Helm-based DAliuGE deployment. + +Limitations: +- Assumes graphs will run on a single pod +- Does not support external graph components (yet) +""" +import argparse +import json +import os +import tempfile + +from dlg.dropmake import pg_generator +from dlg.deploy.helm_client import HelmClient + + +def get_pg(opts, node_managers: list, data_island_managers: list): + if not opts.logical_graph and not opts.physical_graph: + return [] + num_nms = len(node_managers) + num_dims = len(data_island_managers) + + if opts.logical_graph: + unrolled_graph = pg_generator.unroll(opts.logical_graph) + pgt = pg_generator.partition(unrolled_graph, algo='metis', num_partitons=num_nms, + num_islands=num_dims) + del unrolled_graph + else: + with open(opts.physical_graph, 'rb', encoding='utf-8') as pg_file: + pgt = json.load(pg_file) + physical_graph = pg_generator.resource_map(pgt, node_managers + data_island_managers) + # TODO: Add dumping to log-dir + return physical_graph + + +def start_helm(physical_graph_template, num_nodes: int): + # TODO: Dynamic helm chart logging dir + # TODO: Multiple node deployments + available_ips = ["127.0.0.1"] + pgt = json.loads(physical_graph_template) + pgt = pg_generator.partition(pgt, algo='metis', num_partitons=len(available_ips), + num_islands=len(available_ips)) + pg = pg_generator.resource_map(pgt, available_ips + available_ips) + with tempfile.TemporaryDirectory() as tmp_dir: + helm_client = HelmClient( + deploy_name='daliuge-daemon', + chart_name='daliuge-daemon', + deploy_dir=tmp_dir + ) + helm_client.create_helm_chart(json.dumps(pg)) + helm_client.launch_helm() + helm_client.submit_job() + helm_client.teardown() + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + '-L', + '--logical-graph', + action="store", + type=str, + dest="logical_graph", + help="The filename of the logical graph to deploy", + default=None + ) + parser.add_argument( + "-P", + "--physical_graph", + action="store", + type=str, + dest="physical_graph", + help="The filename of the physical graph (template) to deploy", + default=None, + ) + + options = parser.parse_args() + if bool(options.logical_graph) == bool(options.physical_graph): + parser.error( + "Either a logical graph or physical graph filename must be specified" + ) + for graph_file_name in (options.logical_graph, options.physical_graph): + if graph_file_name and not os.path.exists(graph_file_name): + parser.error(f"Cannot locate graph_file at {graph_file_name}") + + available_ips = ["127.0.0.1"] + physical_graph = get_pg(options, available_ips, available_ips) + + helm_client = HelmClient( + deploy_name='daliuge-daemon', + chart_name='daliuge-daemon', + deploy_dir='/home/nicholas/dlg_temp/demo' + ) + helm_client.create_helm_chart(json.dumps(physical_graph)) + helm_client.launch_helm() + helm_client.submit_job() + helm_client.teardown() + + +if __name__ == "__main__": + main() diff --git a/daliuge-engine/pip/requirements.txt b/daliuge-engine/pip/requirements.txt index b696a713a..6dd9ae859 100644 --- a/daliuge-engine/pip/requirements.txt +++ b/daliuge-engine/pip/requirements.txt @@ -17,5 +17,6 @@ python-daemon pyzmq scp twine +pyyaml # 0.6 brings python3 support plus other fixes zerorpc >= 0.6 diff --git a/daliuge-engine/setup.py b/daliuge-engine/setup.py index 297cf1d62..10bbf8bee 100644 --- a/daliuge-engine/setup.py +++ b/daliuge-engine/setup.py @@ -133,6 +133,7 @@ def run(self): "python-daemon", "pyzmq", "scp", + "pyyaml", # 0.19.0 requires netifaces < 0.10.5, exactly the opposite of what *we* need "zeroconf >= 0.19.1", # 0.6 brings python3 support plus other fixes diff --git a/daliuge-engine/test/deploy/test_helm_client.py b/daliuge-engine/test/deploy/test_helm_client.py new file mode 100644 index 000000000..f28bb3ac2 --- /dev/null +++ b/daliuge-engine/test/deploy/test_helm_client.py @@ -0,0 +1,113 @@ +# +# ICRAR - International Centre for Radio Astronomy Research +# (c) UWA - The University of Western Australia, 2019 +# Copyright by UWA (in the framework of the ICRAR) +# All rights reserved +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +""" +Module tests the helm chart translation and deployment functionality. +""" +import unittest +import tempfile +import os +import sys +import yaml +import json + +from dlg.common.version import version as dlg_version +from dlg.deploy.helm_client import HelmClient +from dlg.common import Categories + + +@unittest.skipIf(sys.version_info <= (3, 8), "Copyign temp files fail on Python < 3.7") +class TestHelmClient(unittest.TestCase): + + def test_create_default_helm_chart(self): + with tempfile.TemporaryDirectory() as tmp_dir: + helm_client = HelmClient(deploy_dir=tmp_dir, deploy_name='my_fun_name') + helm_client.create_helm_chart('[]') + chart_file_name = os.path.join(helm_client._chart_dir, "Chart.yaml") + with open(chart_file_name, 'r', encoding='utf-8') as chart_file: + chart_data = yaml.safe_load(chart_file) + self.assertEqual(helm_client._chart_name, chart_data['name']) + self.assertEqual(dlg_version, chart_data['appVersion']) + + def test_custom_ports(self): + pass + + def test_create_single_node_helm_chart(self): + pg = [ + {"oid": "A", "type": "plain", "storage": Categories.MEMORY}, + { + "oid": "B", + "type": "app", + "app": "dlg.apps.simple.SleepApp", + "inputs": ["A"], + "outputs": ["C"], + }, + {"oid": "C", "type": "plain", "storage": Categories.MEMORY}, + ] + for drop in pg: + drop["node"] = "127.0.0.1" + drop["island"] = "127.0.0.1" + with tempfile.TemporaryDirectory() as tmp_dir: + helm_client = HelmClient(deploy_dir=tmp_dir, deploy_name='dlg-test') + helm_client.create_helm_chart(json.dumps(pg)) + self.assertEqual(pg, json.loads(helm_client._physical_graph_file)) + self.assertEqual(1, helm_client._num_islands) + self.assertEqual(3, helm_client._num_nodes) + + @unittest.skip + def test_create_multi_node_helm_chart(self): + pg = [ + {"oid": "A", "type": "plain", "storage": Categories.MEMORY, "node": "127.0.0.1", + "island": "127.0.0.1"}, + { + "oid": "B", + "type": "app", + "app": "dlg.apps.simple.SleepApp", + "inputs": ["A"], + "outputs": ["C"], + "node": "127.0.0.1", + "island": "127.0.0.1" + }, + { + "oid": "D", + "type": "app", + "app": "dlg.apps.simple.SleepApp", + "inputs": ["A"], + "outputs": ["E"], + "node": "127.0.0.2", + "island": "127.0.0.2" + }, + {"oid": "C", "type": "plain", "storage": Categories.MEMORY, "node": "127.0.0.1", + "island": "127.0.0.1"}, + {"oid": "E", "type": "plain", "storage": Categories.MEMORY, "node": "127.0.0.2", + "island": "127.0.0.2"} + ] + with tempfile.TemporaryDirectory() as tmp_dir: + helm_client = HelmClient(deploy_dir=tmp_dir, deploy_name='dlg_test') + helm_client.create_helm_chart(pg) + # TODO: Assert translation works + self.assertEqual(2, helm_client._num_islands) + self.assertEqual(5, helm_client._num_nodes) + self.fail("Test not yet implemented") + + @unittest.skip + def test_submit_job(self): + self.fail("Test not yet implemented") diff --git a/daliuge-engine/test/deploy/test_slurm_utils.py b/daliuge-engine/test/deploy/test_slurm_utils.py index a1faf6e32..eb75eed15 100644 --- a/daliuge-engine/test/deploy/test_slurm_utils.py +++ b/daliuge-engine/test/deploy/test_slurm_utils.py @@ -22,12 +22,12 @@ import unittest -from dlg.deploy import slurm_utils +from dlg.deploy import deployment_utils class TestSlurmUtils(unittest.TestCase): def assert_list_as_string(self, s, expected_list): - slurm_list = slurm_utils.list_as_string(s) + slurm_list = deployment_utils.list_as_string(s) self.assertEqual(expected_list, slurm_list) def test_list_as_string(self): diff --git a/daliuge-k8s/helm/README.md b/daliuge-k8s/helm/README.md index fb97dc941..f7a6bbb27 100644 --- a/daliuge-k8s/helm/README.md +++ b/daliuge-k8s/helm/README.md @@ -10,11 +10,12 @@ Finally, on minikube you may need to run the follwoing NOTE: On MacOS you can run with --clenaup and will start it and cleanup after. Not sure if this is the dame for all platforms. +NOTE: Using --values my-values will overwrite any values specified in the values.yaml file. # Install/Setup From mychart directory -helm install daliuge-daemon . +helm install daliuge-daemon . --values my-values.yaml kubectl get svc -o wide curl -d '{"nodes": ["localhost"]}' -H "Content-Type: application/json" -X POST http://:9000/managers/island/start helm uninstall daliuge-daemon diff --git a/daliuge-k8s/helm/mychart/.helmignore b/daliuge-k8s/helm/daliuge-daemon/.helmignore similarity index 100% rename from daliuge-k8s/helm/mychart/.helmignore rename to daliuge-k8s/helm/daliuge-daemon/.helmignore diff --git a/daliuge-k8s/helm/daliuge-daemon/Chart.yaml b/daliuge-k8s/helm/daliuge-daemon/Chart.yaml new file mode 100644 index 000000000..2bfd71a42 --- /dev/null +++ b/daliuge-k8s/helm/daliuge-daemon/Chart.yaml @@ -0,0 +1,13 @@ +apiVersion: v2 +appVersion: 2.0.1 +description: DALiuGE k8s deployment +home: https://github.com/ICRAR/daliuge/daliuge-k8s +keywords: +- daliuge +- workflow +kubeVersion: '>=1.10.0-0' +name: daliuge-daemon +sources: +- https://github.com/ICRAR/daliuge/daliuge-k8s +type: application +version: 0.1.0 diff --git a/daliuge-k8s/helm/mychart/my-values.yaml b/daliuge-k8s/helm/daliuge-daemon/my-values.yaml similarity index 100% rename from daliuge-k8s/helm/mychart/my-values.yaml rename to daliuge-k8s/helm/daliuge-daemon/my-values.yaml diff --git a/daliuge-k8s/helm/mychart/templates/daliuge-daemon-configmap.yaml b/daliuge-k8s/helm/daliuge-daemon/templates/daliuge-daemon-configmap.yaml similarity index 100% rename from daliuge-k8s/helm/mychart/templates/daliuge-daemon-configmap.yaml rename to daliuge-k8s/helm/daliuge-daemon/templates/daliuge-daemon-configmap.yaml diff --git a/daliuge-k8s/helm/mychart/templates/daliuge-daemon-depl-store-minikube.yaml b/daliuge-k8s/helm/daliuge-daemon/templates/daliuge-daemon-depl-store-minikube.yaml similarity index 88% rename from daliuge-k8s/helm/mychart/templates/daliuge-daemon-depl-store-minikube.yaml rename to daliuge-k8s/helm/daliuge-daemon/templates/daliuge-daemon-depl-store-minikube.yaml index 42f3bc7f0..83dbd2770 100644 --- a/daliuge-k8s/helm/mychart/templates/daliuge-daemon-depl-store-minikube.yaml +++ b/daliuge-k8s/helm/daliuge-daemon/templates/daliuge-daemon-depl-store-minikube.yaml @@ -22,13 +22,13 @@ spec: - name: daliuge-daemon image: {{ .Values.containers.name }} ports: - - containerPort: {{ .Values.containers.ports.containerPort }} + - containerPort: {{ .Values.containers.ports.containerPort }} volumeMounts: - mountPath: {{ .Values.dlg_root_in_container }} name: dlg-mount env: - name: DLG_ROOT - valueFrom: + valueFrom: configMapKeyRef: name: daliuge-daemon-configmap key: dlg_root diff --git a/daliuge-k8s/helm/daliuge-daemon/templates/daliuge-daemon-service.yaml b/daliuge-k8s/helm/daliuge-daemon/templates/daliuge-daemon-service.yaml new file mode 100644 index 000000000..074d98812 --- /dev/null +++ b/daliuge-k8s/helm/daliuge-daemon/templates/daliuge-daemon-service.yaml @@ -0,0 +1,20 @@ +apiVersion: v1 +kind: Service +metadata: + name: daliuge-daemon-service +spec: + selector: + app: daliuge-daemon + ports: + - protocol: TCP + name: {{ .Values.service.daemon.name }} + port: {{ .Values.service.daemon.port }} + targetPort: {{ .Values.containers.ports.containerPort }} + - protocol: TCP + name: {{ .Values.service.deployment.name }} + port: {{ .Values.service.deployment.port }} + targetPort: {{ .Values.containers.ports.deploymentPort }} + - protocol: TCP + name: {{ .Values.service.nodemgr.name }} + port: {{ .Values.service.nodemgr.port }} + targetPort: {{ .Values.containers.ports.nodemanagerPort }} \ No newline at end of file diff --git a/daliuge-k8s/helm/daliuge-daemon/values.yaml b/daliuge-k8s/helm/daliuge-daemon/values.yaml new file mode 100644 index 000000000..e51a922be --- /dev/null +++ b/daliuge-k8s/helm/daliuge-daemon/values.yaml @@ -0,0 +1,19 @@ +name: daliuge-daemon +dlg_root_on_cluster_nodes: /dlg +dlg_root_in_container: /dlg +containers: + name: icrar/daliuge-engine:2.0.1 + ports: + containerPort: 9000 + deploymentPort: 8001 + nodemanagerPort: 8000 +service: + daemon: + name: daemon-port + port: 9000 + deployment: + name: island-port + port: 8001 + nodemgr: + name: node-manager-port + port: 8000 \ No newline at end of file diff --git a/daliuge-k8s/helm/mychart/Chart.yaml b/daliuge-k8s/helm/mychart/Chart.yaml deleted file mode 100644 index dd682a241..000000000 --- a/daliuge-k8s/helm/mychart/Chart.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: v2 -name: daliuge-daemon -version: 1.0.0 -appVersion: v1.0.0 -home: https://github.com/ICRAR/daliuge/daliuge-k8s -description: DALiuGE k8s deployment -keywords: - - daliuge - - workflow -sources: - - https://github.com/ICRAR/daliuge/daliuge-k8s -kubeVersion: ">=1.10.0-0" diff --git a/daliuge-k8s/helm/mychart/templates/daliuge-daemon-service.yaml b/daliuge-k8s/helm/mychart/templates/daliuge-daemon-service.yaml deleted file mode 100644 index a98b525f1..000000000 --- a/daliuge-k8s/helm/mychart/templates/daliuge-daemon-service.yaml +++ /dev/null @@ -1,11 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: daliuge-daemon-service -spec: - selector: - app: daliuge-daemon - ports: - - protocol: TCP - port: {{ .Values.service.port }} - targetPort: {{ .Values.containers.ports.containerPort }} diff --git a/daliuge-k8s/helm/mychart/values.yaml b/daliuge-k8s/helm/mychart/values.yaml deleted file mode 100644 index aa64905a5..000000000 --- a/daliuge-k8s/helm/mychart/values.yaml +++ /dev/null @@ -1,9 +0,0 @@ -name: daliuge-daemon -dlg_root_on_cluster_nodes: /dlg -dlg_root_in_container: /dlg -containers: - name: icrar/daliuge-engine:master - ports: - containerPort: 9000 -service: - port: 9000 diff --git a/daliuge-translator/test-requirements.txt b/daliuge-translator/test-requirements.txt index 9fe7ad47d..0ad5db537 100644 --- a/daliuge-translator/test-requirements.txt +++ b/daliuge-translator/test-requirements.txt @@ -1,3 +1,4 @@ gitpython ruamel.yaml==0.16.0; python_version=='2.7' typing>=3.7.4 +pyyaml>=6.0 diff --git a/docs/deployment.rst b/docs/deployment.rst index 1543c4026..8688d21fe 100644 --- a/docs/deployment.rst +++ b/docs/deployment.rst @@ -66,6 +66,15 @@ Deployment with OpenOnDemand `OpenOnDemand `_ (OOD) is a system providing an interactive interface to remote compute resources. It is becoming increasingly popular with a number of HPC centers around the world. The two Australian research HPC centers Pawsey and NCI are planning to roll it out for their users. Independently we had realized that |daliuge| is missing a authentication, authorization and session management system and started looking into OOD as a solution for this. After a short evaluation we have started integrating OOD into the deployment for our small in-house compute cluster. In order to make this work we needed to implement an additional interface between the translator running on an external server (e.g. AWS) and OOD and then further on into the (SLURM) batch job system. This interface code is currently in a separate private git repository, but will be released as soon as we have finished testing it. The code mimics the |daliuge| data island manager's REST interface, but instead of launching the workflow directly it prepares a SLURM job submission script and places it into the queue. Users can then use the standard OOD web-pages to monitor the jobs and get access to the logs and results of the workflow execution. OOD allows the integration of multiple compute resources, including Kubernetes and also (to a certain degree) GCP, AWS and Azure. Once configured, users can choose to submit their jobs to any of those. Our OOD interface code has been implemented as an OOD embedded `Phusion Passenger `_ `Flask `_ application, which is `WSGI `_ compliant. Very little inside that application is OOD specific and can thus be easily ported to other deployment scenarios. +Deployment with Kubernetes (Coming Soon) + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Kubernetes is a canonical container orchestration system. +We are building support to deploy workflows as helm charts which will enable easier and more reliably deployments across more computing facilities. +Support is currently limited but watch this space. + + Component Deployment ====================