From 6630256d0881d481f8cab2be5362da705ba075f4 Mon Sep 17 00:00:00 2001 From: myxie <21298244@student.uwa.edu.au> Date: Fri, 26 Nov 2021 14:20:25 +0800 Subject: [PATCH] REF: https://github.com/ICRAR/daliuge/issues/83 As per https://networkx.org/documentation/latest/release/release_2.4.html, NetworkX uses `Graph.nodes[u]` to access a node `u` in the Graph. This update changes any `DiGraph.node[u]` access to `DiGraph.nodes[u]`. --- daliuge-translator/dlg/dropmake/dm_utils.py | 4 +- .../dlg/dropmake/pg_generator.py | 14 +- daliuge-translator/dlg/dropmake/scheduler.py | 707 ++++++++---------- .../dlg/dropmake/utils/antichains.py | 149 ++-- daliuge-translator/setup.py | 2 +- 5 files changed, 395 insertions(+), 481 deletions(-) diff --git a/daliuge-translator/dlg/dropmake/dm_utils.py b/daliuge-translator/dlg/dropmake/dm_utils.py index d3d4ba641..b22384d05 100644 --- a/daliuge-translator/dlg/dropmake/dm_utils.py +++ b/daliuge-translator/dlg/dropmake/dm_utils.py @@ -456,9 +456,9 @@ def convert_construct(lgo): app_node["key"] = node["key"] app_node["category"] = node[has_app] # node['application'] if has_app[0] == "i": - app_node["text"] = node["inputApplicationName"] + app_node["text"] = node["text"] else: - app_node["text"] = node["outputApplicationName"] + app_node["text"] = node["text"] if "mkn" in node: app_node["mkn"] = node["mkn"] diff --git a/daliuge-translator/dlg/dropmake/pg_generator.py b/daliuge-translator/dlg/dropmake/pg_generator.py index 9cc351efc..99cfd2cae 100644 --- a/daliuge-translator/dlg/dropmake/pg_generator.py +++ b/daliuge-translator/dlg/dropmake/pg_generator.py @@ -984,7 +984,7 @@ def pred_exec_time(self, app_drop_only=False, wk="weight", force_answer=False): return None if app_drop_only: lp = DAGUtil.get_longest_path(G, show_path=True)[0] - return sum(G.node[u].get(wk, 0) for u in lp) + return sum(G.nodes[u].get(wk, 0) for u in lp) else: return DAGUtil.get_longest_path(G, show_path=False)[1] @@ -1129,9 +1129,9 @@ def to_gojs_json(self, string_rep=True, outdict=None, visual=False): link = dict() link["from"] = myk from_dt = 0 if drop["type"] == DropType.PLAIN else 1 - to_dt = G.node[oup]["dt"] + to_dt = G.nodes[oup]["dt"] if from_dt == to_dt: - to_drop = G.node[oup]["drop_spec"] + to_drop = G.nodes[oup]["drop_spec"] if from_dt == 0: # add an extra app DROP extra_oid = "{0}_TransApp_{1}".format(oid, i) @@ -1177,7 +1177,7 @@ def to_gojs_json(self, string_rep=True, outdict=None, visual=False): # global graph updates # the new drop must have the same gid as the to_drop add_nodes.append( - (lid, 1, mydt, dropSpec, G.node[oup].get("gid", None)) + (lid, 1, mydt, dropSpec, G.nodes[oup].get("gid", None)) ) remove_edges.append((myk, oup)) add_edges.append((myk, lid)) @@ -1491,8 +1491,8 @@ def merge_partitions( GG = self._G part_edges = defaultdict(int) # k: from_gid + to_gid, v: sum_of_weight for e in GG.edges(data=True): - from_gid = GG.node[e[0]]["gid"] - to_gid = GG.node[e[1]]["gid"] + from_gid = GG.nodes[e[0]]["gid"] + to_gid = GG.nodes[e[1]]["gid"] k = "{0}**{1}".format(from_gid, to_gid) part_edges[k] += e[2]["weight"] @@ -1767,7 +1767,7 @@ def to_gojs_json(self, string_rep=True, outdict=None, visual=False): node_list = jsobj["nodeDataArray"] for node in node_list: - gid = G.node[node["key"]]["gid"] # gojs group_id + gid = G.nodes[node["key"]]["gid"] # gojs group_id if gid is None: raise GPGTException( "Node {0} does not have a Partition".format(node["key"]) diff --git a/daliuge-translator/dlg/dropmake/scheduler.py b/daliuge-translator/dlg/dropmake/scheduler.py index 04e91c27b..5d1d3f7f8 100644 --- a/daliuge-translator/dlg/dropmake/scheduler.py +++ b/daliuge-translator/dlg/dropmake/scheduler.py @@ -1,3 +1,4 @@ + # ICRAR - International Centre for Radio Astronomy Research # (c) UWA - The University of Western Australia, 2015 # Copyright by UWA (in the framework of the ICRAR) @@ -39,30 +40,25 @@ DEBUG = 0 - class SchedulerException(Exception): pass - class Schedule(object): """ The scheduling solution with schedule-related properties """ - def __init__(self, dag, max_dop): self._dag = dag - self._max_dop = max_dop if type(max_dop) == int else max_dop.get("num_cpus", 1) + self._max_dop = max_dop if type(max_dop) == int else max_dop.get('num_cpus', 1) DAGUtil.label_schedule(self._dag) - self._lpl = DAGUtil.get_longest_path( - self._dag, default_weight=0, show_path=True - ) + self._lpl = DAGUtil.get_longest_path(self._dag, default_weight=0, show_path=True) self._wkl = None self._sma = None @property def makespan(self): return self._lpl[1] - + @property def longest_path(self): return self._lpl[0] @@ -73,21 +69,21 @@ def schedule_matrix(self): Return: a self._lpl x self._max_dop matrix (X - time, Y - resource unit / parallel lane) """ - if self._sma is None: + if (self._sma is None): G = self._dag N = max(self.makespan, 1) - if DEBUG: + if (DEBUG): lpl_str = [] lpl_c = 0 for lpn in self.longest_path: - ww = G.node[lpn].get("num_cpus", 0) + ww = G.nodes[lpn].get('num_cpus', 0) lpl_str.append("{0}({1})".format(lpn, ww)) lpl_c += ww logger.debug("lpl: %s", " -> ".join(lpl_str)) logger.debug("lplt = %d", int(lpl_c)) - + M = self._max_dop - # print("N (makespan) is ", N, "M is ", M) + #print("N (makespan) is ", N, "M is ", M) ma = np.zeros((M, N), dtype=int) pr = np.zeros((M), dtype=int) last_pid = -1 @@ -95,38 +91,33 @@ def schedule_matrix(self): topo_sort = nx.topological_sort(G) for n in topo_sort: - node = G.node[n] + node = G.nodes[n] try: - stt = node["stt"] - edt = node["edt"] + stt = node['stt'] + edt = node['edt'] except KeyError as ke: - raise SchedulerException( - "No schedule labels found: {0}".format(str(ke)) - ) - if edt == stt: + raise SchedulerException("No schedule labels found: {0}".format(str(ke))) + if (edt == stt): continue - if prev_n in G.predecessors(n): + if (prev_n in G.predecessors(n)): curr_pid = last_pid else: found = None for i in range(M): - if pr[i] <= stt: + if (pr[i] <= stt): found = i break - if found is None: - raise SchedulerException( - "Cannot find a idle PID, max_dop provided: {0}, actual max_dop: {1}\n Graph: {2}".format( - M, "DAGUtil.get_max_dop(G)", G.nodes(data=True) - ) - ) - # DAGUtil.get_max_dop(G), G.nodes(data=True))) + if (found is None): + raise SchedulerException("Cannot find a idle PID, max_dop provided: {0}, actual max_dop: {1}\n Graph: {2}".format(M, + 'DAGUtil.get_max_dop(G)', G.nodes(data=True))) + #DAGUtil.get_max_dop(G), G.nodes(data=True))) curr_pid = found ma[curr_pid, stt:edt] = n pr[curr_pid] = edt last_pid = curr_pid prev_n = n self._sma = ma - # print(ma) + #print(ma) return self._sma @property @@ -135,12 +126,12 @@ def workload(self): Return: (integer) the mean # of resource units per time unit consumed by the graph/partition """ - if self._wkl is None: + if (self._wkl is None): ma = self.schedule_matrix c = [] for i in range(ma.shape[1]): - c.append(np.count_nonzero(ma[:, i])) - self._wkl = int(np.mean(np.array(c))) # since METIS only accepts integer + c.append(np.count_nonzero(ma[:,i])) + self._wkl = int(np.mean(np.array(c))) # since METIS only accepts integer return self._wkl @property @@ -150,7 +141,6 @@ def efficiency(self): """ return int(float(self.workload) / self._max_dop * 100) - class Partition(object): """ Logical partition, multiple (1 ~ N) of these can be placed onto a single @@ -158,7 +148,6 @@ class Partition(object): Logical partition can be nested, and it somewhat resembles the `dlg.manager.drop_manager` """ - def __init__(self, gid, max_dop): """ gid: cluster/partition id (string) @@ -167,7 +156,7 @@ def __init__(self, gid, max_dop): self._gid = gid self._dag = nx.DiGraph() self._ask_max_dop = max_dop - self._max_antichains = None # a list of max (width) antichains + self._max_antichains = None # a list of max (width) antichains self._lpl = None self._schedule = None self._max_dop = None @@ -194,7 +183,7 @@ def schedule(self): """ Get the schedule assocaited with this partition """ - if self._schedule is None: + if (self._schedule is None): self._schedule = Schedule(self._dag, self._max_dop) return self._schedule @@ -203,26 +192,27 @@ def recompute_schedule(self): return self.schedule def can_merge(self, that): - if self._max_dop + that._max_dop <= self._ask_max_dop: + if (self._max_dop + that._max_dop <= self._ask_max_dop): return True else: return False - # TODO re-implement this performance hog! - # self._tmp_merge_dag = nx.compose(self._dag, that._dag) - # return DAGUtil.get_max_dop(self._tmp_merge_dag) <= self._ask_max_dop + #TODO re-implement this performance hog! + #self._tmp_merge_dag = nx.compose(self._dag, that._dag) + #return DAGUtil.get_max_dop(self._tmp_merge_dag) <= self._ask_max_dop def merge(self, that): - if self._tmp_merge_dag is not None: + if (self._tmp_merge_dag is not None): self._dag = self._tmp_merge_dag self._tmp_merge_dag = None else: self._dag = nx.compose(self._dag, that._dag) - # self._max_dop + #self._max_dop + + #TODO add this performance hog! + #self._max_antichains = None - # TODO add this performance hog! - # self._max_antichains = None def can_add(self, u, v, gu, gv): """ @@ -230,48 +220,41 @@ def can_add(self, u, v, gu, gv): A node may be rejected due to reasons such as: DoP overflow or completion time deadline overdue, etc. """ - uw = gu["weight"] - vw = gv["weight"] - if len(self._dag.nodes()) == 0: + uw = gu['weight'] + vw = gv['weight'] + if (len(self._dag.nodes()) == 0): return (True, False, False) unew = u not in self._dag.node vnew = v not in self._dag.node - if DEBUG: + if (DEBUG): slow_max = DAGUtil.get_max_antichains(self._dag) fast_max = self._max_antichains - info = "Before: {0} - slow max: {1}, fast max: {2}, u: {3}, v: {4}, unew:{5}, vnew:{6}".format( - self._dag.edges(), slow_max, fast_max, u, v, unew, vnew - ) + info = "Before: {0} - slow max: {1}, fast max: {2}, u: {3}, v: {4}, unew:{5}, vnew:{6}".format(self._dag.edges(), + slow_max, fast_max, u, v, unew, vnew) logger.debug(info) - if len(slow_max) != len(fast_max): + if (len(slow_max) != len(fast_max)): raise SchedulerException("ERROR - {0}".format(info)) self._dag.add_node(u, weight=uw) self._dag.add_node(v, weight=vw) self._dag.add_edge(u, v) - if unew and vnew: + if (unew and vnew): mydop = DAGUtil.get_max_dop(self._dag) else: mydop = self.probe_max_dop(u, v, unew, vnew) - # TODO - put the following code in a unit test! - if DEBUG: - mydop_slow = DAGUtil.get_max_dop(self._dag) # - if mydop_slow != mydop: - err_msg = "u = {0}, v = {1}, unew = {2}, vnew = {3}".format( - u, v, unew, vnew - ) - raise SchedulerException( - "{2}: mydop = {0}, mydop_slow = {1}".format( - mydop, mydop_slow, err_msg - ) - ) + #TODO - put the following code in a unit test! + if (DEBUG): + mydop_slow = DAGUtil.get_max_dop(self._dag)# + if (mydop_slow != mydop): + err_msg = "u = {0}, v = {1}, unew = {2}, vnew = {3}".format(u, v, unew, vnew) + raise SchedulerException("{2}: mydop = {0}, mydop_slow = {1}".format(mydop, mydop_slow, err_msg)) ret = False if mydop > self._ask_max_dop else True - if unew: + if (unew): self.remove(u) - if vnew: + if (vnew): self.remove(v) return (ret, unew, vnew) @@ -282,47 +265,47 @@ def add(self, u, v, gu, gv, sequential=False, global_dag=None): """ # if (self.partition_id == 180): # logger.debug("u = ", u, ", v = ", v, ", partition = ", self.partition_id) - uw = gu["weight"] - vw = gv["weight"] + uw = gu['weight'] + vw = gv['weight'] unew = u not in self._dag.node vnew = v not in self._dag.node - self._dag.add_node(u, weight=uw, num_cpus=gu["num_cpus"]) - self._dag.add_node(v, weight=vw, num_cpus=gv["num_cpus"]) + self._dag.add_node(u, weight=uw, num_cpus=gu['num_cpus']) + self._dag.add_node(v, weight=vw, num_cpus=gv['num_cpus']) self._dag.add_edge(u, v) - if unew and vnew: # we know this is fast + if (unew and vnew): # we know this is fast self._max_antichains = DAGUtil.get_max_antichains(self._dag) self._max_dop = 1 else: - if sequential and (global_dag is not None): + if (sequential and (global_dag is not None)): # break potential antichain to sequential chain - if unew: + if (unew): v_ups = nx.ancestors(self._dag, v) for vup in v_ups: - if u == vup: + if (u == vup): continue - if len(list(self._dag.predecessors(vup))) == 0: + if (len(list(self._dag.predecessors(vup))) == 0): # link u to "root" parent of v to break antichain self._dag.add_edge(u, vup) # change the original global graph global_dag.add_edge(u, vup, weight=0) - if not nx.is_directed_acyclic_graph(global_dag): + if (not nx.is_directed_acyclic_graph(global_dag)): global_dag.remove_edge(u, vup) else: u_downs = nx.descendants(self._dag, u) for udo in u_downs: - if udo == v: + if (udo == v): continue - if len(list(self._dag.successors(udo))) == 0: + if (len(list(self._dag.successors(udo))) == 0): # link "leaf" children of u to v to break antichain self._dag.add_edge(udo, v) # change the original global graph global_dag.add_edge(udo, v, weight=0) - if not nx.is_directed_acyclic_graph(global_dag): + if (not nx.is_directed_acyclic_graph(global_dag)): global_dag.remove_edge(udo, v) self._max_dop = self.probe_max_dop(u, v, unew, vnew, update=True) - # self._max_dop = DAGUtil.get_max_dop(self._dag)# this is too slow! + #self._max_dop = DAGUtil.get_max_dop(self._dag)# this is too slow! def remove(self, n): """ @@ -342,60 +325,55 @@ def probe_max_dop(self, u, v, unew, vnew, update=False): An incremental antichain (which appears significantly more efficient than the networkx antichains) But only works for DoP, not for weighted width """ - if self._max_antichains is None: + if (self._max_antichains is None): new_ac = DAGUtil.get_max_antichains(self._dag) - if update: + if (update): self._max_antichains = new_ac - if len(new_ac) == 0: - if update: + if (len(new_ac) == 0): + if (update): self._max_antichains = None return 0 else: return len(new_ac[0]) else: - if update and self._tmp_new_ac is not None: + if (update and self._tmp_new_ac is not None): self._max_antichains, md = self._tmp_new_ac self._tmp_new_ac = None return md - if unew: + if (unew): ups = nx.descendants(self._dag, u) new_node = u - elif vnew: + elif (vnew): ups = nx.ancestors(self._dag, v) new_node = v else: raise SchedulerException("u v are both new/old") new_ac = [] md = 1 - for ( - ma - ) in ( - self._max_antichains - ): # missing elements in the current max_antichains! - # incremental updates + for ma in self._max_antichains: # missing elements in the current max_antichains! + #incremental updates found = False for n in ma: - if n in ups: + if (n in ups): found = True break - if not found: + if (not found): mma = list(ma) mma.append(new_node) new_ac.append(mma) - if len(mma) > md: + if (len(mma) > md): md = len(mma) - elif len(ma) > md: + elif (len(ma) > md): md = len(ma) - new_ac.append(ma) # carry over, then prune it - if len(new_ac) > 0: + new_ac.append(ma) # carry over, then prune it + if (len(new_ac) > 0): self._tmp_new_ac = (new_ac, md) - if update: + if (update): self._max_antichains = new_ac return md else: raise SchedulerException("No antichains") - @property def cardinality(self): return len(self._dag.nodes()) @@ -407,21 +385,20 @@ class KFamilyPartition(Partition): the Theorem 3.1 in http://fmdb.cs.ucla.edu/Treports/930014.pdf """ - def __init__(self, gid, max_dop, global_dag=None): """ max_dop: dict with key: resource_attributes (string) value: resource_capacity (integer) """ mtype = type(max_dop) - if mtype == int: + if (mtype == int): # backward compatible - max_dop = {"num_cpus": max_dop} - elif mtype == dict: + max_dop = {'num_cpus': max_dop} + elif (mtype == dict): pass else: - raise SchedulerException("Invalid max_dop type: %r" % mtype) - + raise SchedulerException('Invalid max_dop type: %r' % mtype) + super(KFamilyPartition, self).__init__(gid, max_dop) self._bpg = nx.DiGraph() self._global_dag = global_dag @@ -435,53 +412,53 @@ def add_node(self, u): Add a single node u to the partition """ kwargs = dict() - if self._tmp_max_dop is None: + if (self._tmp_max_dop is None): self._tmp_max_dop = dict() self_global_dag = self._global_dag for _w_attr in self._w_attr: - u_aw = self_global_dag.node[u].get(_w_attr, 1) + u_aw = self_global_dag.nodes[u].get(_w_attr, 1) kwargs[_w_attr] = u_aw - kwargs["weight"] = self_global_dag.node[u].get("weight", 5) + kwargs['weight'] = self_global_dag.nodes[u].get('weight', 5) self._dag.add_node(u, **kwargs) for k in self._w_attr: self._tmp_max_dop[k] = get_max_weighted_antichain(self._dag, w_attr=k)[0] - self._max_dop = self._tmp_max_dop + self._max_dop = self._tmp_max_dop def can_merge(self, that, u, v): - """ """ + """ + """ dag = nx.compose(self._dag, that._dag) - if u is not None: + if (u is not None ): dag.add_edge(u, v) tmp_max_dop = copy.deepcopy(self._tmp_max_dop) for _w_attr in self._w_attr: mydop = get_max_weighted_antichain(dag, w_attr=_w_attr)[0] curr_max = max(self._max_dop[_w_attr], that._max_dop[_w_attr]) - - if mydop <= curr_max: + + if (mydop <= curr_max): # if you don't increase DoP, we accept that immediately tmp_max_dop[_w_attr] = curr_max - elif mydop > self._ask_max_dop[_w_attr]: + elif (mydop > self._ask_max_dop[_w_attr]): return False else: tmp_max_dop[_w_attr] = mydop - - self._tmp_max_dop = tmp_max_dop # only change it when returning True + + self._tmp_max_dop = tmp_max_dop # only change it when returning True return True def merge(self, that, u, v): self._dag = nx.compose(self._dag, that._dag) - if u is not None: + if (u is not None): self._dag.add_edge(u, v) - if self._tmp_max_dop is not None: + if (self._tmp_max_dop is not None): self._max_dop = self._tmp_max_dop - # print("Gid %d just merged with DoP %d" % (self._gid, self._tmp_max_dop)) + #print("Gid %d just merged with DoP %d" % (self._gid, self._tmp_max_dop)) else: # we could recalcuate it again, but we are lazy! raise SchedulerException("can_merge was not probed before add()") - -class Scheduler(object): +class Scheduler(object): """ Static Scheduling consists of three steps: 1. partition the DAG into an optimal number (M) of partitions @@ -497,14 +474,14 @@ def __init__(self, drop_list, max_dop=8, dag=None): turn drop_list into DAG, and check its validity """ self._drop_list = drop_list - if dag is None: + if (dag is None): self._dag = DAGUtil.build_dag_from_drops(self._drop_list) else: self._dag = dag self._max_dop = max_dop - self._parts = None # partitions - self._part_dict = dict() # {gid : part} - self._part_edges = [] # edges amongst all partitions + self._parts = None # partitions + self._part_dict = dict() #{gid : part} + self._part_edges = [] # edges amongst all partitions def partition_dag(self): raise SchedulerException("Not implemented. Try subclass instead") @@ -515,54 +492,56 @@ def merge_partitions(self, num_partitions, bal_cond=1): implemented using METIS for now bal_cond: load balance condition (integer): - 0 - workload, + 0 - workload, 1 - CPU count (faster to evaluate than workload) """ # 1. build the bi-directional graph (each partition is a node) metis = DAGUtil.import_metis() G = nx.Graph() st_gid = len(self._drop_list) + len(self._parts) + 1 - if bal_cond == 0: - G.graph["node_weight_attr"] = ["wkl", "eff"] + if (bal_cond == 0): + G.graph['node_weight_attr'] = ['wkl', 'eff'] for part in self._parts: sc = part.schedule G.add_node(part.partition_id, wkl=sc.workload, eff=sc.efficiency) else: - G.graph["node_weight_attr"] = "cc" + G.graph['node_weight_attr'] = 'cc' for part in self._parts: - # sc = part.schedule + #sc = part.schedule pdop = part._max_dop - # TODO add memory as one of the LB condition too - cc_eval = pdop if type(pdop) == int else pdop.get("num_cpus", 1) + #TODO add memory as one of the LB condition too + cc_eval = pdop if type(pdop) == int else pdop.get('num_cpus', 1) G.add_node(part.partition_id, cc=cc_eval) for e in self._part_edges: u = e[0] v = e[1] - ugid = self._dag.node[u].get("gid", None) - vgid = self._dag.node[v].get("gid", None) - G.add_edge(ugid, vgid) # repeating is fine - ew = self._dag.adj[u][v]["weight"] + ugid = self._dag.nodes[u].get('gid', None) + vgid = self._dag.nodes[v].get('gid', None) + G.add_edge(ugid, vgid) # repeating is fine + ew = self._dag.adj[u][v]['weight'] try: - G[ugid][vgid]["weight"] += ew + G[ugid][vgid]['weight'] += ew except KeyError: - G[ugid][vgid]["weight"] = ew - # DAGUtil.metis_part(G, 15) + G[ugid][vgid]['weight'] = ew + #DAGUtil.metis_part(G, 15) # since METIS does not allow zero edge weight, reset them to one for e in G.edges(data=True): - if e[2]["weight"] == 0: - e[2]["weight"] = 1 - # logger.debug(G.nodes(data=True)) - (edgecuts, metis_parts) = metis.part_graph(G, nparts=num_partitions, ufactor=1) - - for node, pt in zip(G.nodes(), metis_parts): # note min(pt) == 0 + if (e[2]['weight'] == 0): + e[2]['weight'] = 1 + #logger.debug(G.nodes(data=True)) + (edgecuts, metis_parts) = metis.part_graph(G, + nparts=num_partitions, + ufactor=1) + + for node, pt in zip(G.nodes(), metis_parts): # note min(pt) == 0 parent_id = pt + st_gid child_part = self._part_dict[node] child_part.parent_id = parent_id - # logger.debug("Part {0} --> Cluster {1}".format(child_part.partition_id, parent_id)) - # parent_part = Partition(parent_id, None) - # self._parts.append(parent_part) - # logger.debug("Edgecuts of merged partitions: ", edgecuts) + #logger.debug("Part {0} --> Cluster {1}".format(child_part.partition_id, parent_id)) + #parent_part = Partition(parent_id, None) + #self._parts.append(parent_part) + #logger.debug("Edgecuts of merged partitions: ", edgecuts) return edgecuts def map_partitions(self): @@ -571,7 +550,6 @@ def map_partitions(self): """ pass - class MySarkarScheduler(Scheduler): """ Based on "V. Sarkar, Partitioning and Scheduling Parallel Programs for Execution on @@ -589,10 +567,9 @@ class MySarkarScheduler(Scheduler): Similar ideas: http://stackoverflow.com/questions/3974731 """ - def __init__(self, drop_list, max_dop=8, dag=None, dump_progress=False): super(MySarkarScheduler, self).__init__(drop_list, max_dop=max_dop, dag=dag) - self._sspace = [3] * len(self._dag.edges()) # all edges are not zeroed + self._sspace = [3] * len(self._dag.edges()) # all edges are not zeroed self._dump_progress = dump_progress def override_cannot_add(self): @@ -610,7 +587,8 @@ def is_time_critical(self, u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el): logger.debug("MySarkar time criticality called") return True - def _merge_two_parts(self, ugid, vgid, u, v, gu, gv, g_dict, parts, G): + def _merge_two_parts(self, ugid, vgid, + u, v, gu, gv, g_dict, parts, G): """ Merge two parts associated with u and v respectively @@ -620,13 +598,13 @@ def _merge_two_parts(self, ugid, vgid, u, v, gu, gv, g_dict, parts, G): """ # get the new part should we go ahead # the new part should be one of partu or partv - # print("\nMerging ugid %d and vgid %d, u %d and v %d" % (ugid, vgid, u, v)) + #print("\nMerging ugid %d and vgid %d, u %d and v %d" % (ugid, vgid, u, v)) l_gid = min(ugid, vgid) r_gid = max(ugid, vgid) part_new = g_dict[l_gid] part_removed = g_dict[r_gid] - if not part_new.can_merge(part_removed, u, v): + if (not part_new.can_merge(part_removed, u, v)): return None part_new.merge(part_removed, u, v) @@ -634,24 +612,24 @@ def _merge_two_parts(self, ugid, vgid, u, v, gu, gv, g_dict, parts, G): # Get hold of all gnodes that belong to "part_removed" # and re-assign them to the new partitions for n in part_removed._dag.nodes(): - G.node[n]["gid"] = l_gid + G.nodes[n]['gid'] = l_gid index = None for i, part in enumerate(parts): p_gid = part._gid - if p_gid > r_gid: + if (p_gid > r_gid): g_dict[p_gid - 1] = part part._gid -= 1 for n in part._dag.nodes(): - G.node[n]["gid"] = part._gid - elif p_gid == r_gid: - # index = len(parts) - i - 1 + G.nodes[n]['gid'] = part._gid + elif (p_gid == r_gid): + #index = len(parts) - i - 1 index = i del g_dict[p_gid] - if index is None: + if (index is None): raise SchedulerException("Failed to find r_gid") - parts[:] = parts[0:index] + parts[index + 1 :] + parts[:] = parts[0:index] + parts[index + 1:] return part_new @@ -667,21 +645,20 @@ def reduce_partitions(self, parts, g_dict, G): """ done_reduction = False num_reductions = 0 - # TODO consider other w_attrs other than CPUs! - parts.sort(key=lambda x: x._max_dop["num_cpus"]) - while not done_reduction: + #TODO consider other w_attrs other than CPUs! + parts.sort(key=lambda x: x._max_dop['num_cpus']) + while (not done_reduction): for i, partA in enumerate(parts): - if i < len(parts) - 1: + if (i < len(parts) - 1): partB = parts[i + 1] - new_part = self._merge_two_parts( - partA._gid, partB._gid, None, None, None, None, g_dict, parts, G - ) - if new_part is not None: + new_part = self._merge_two_parts(partA._gid, partB._gid, None, None, + None, None, g_dict, parts, G) + if (new_part is not None): num_reductions += 1 - break # force re-sorting + break # force re-sorting else: done_reduction = True - logger.info("Performed reductions %d times", num_reductions) + logger.info('Performed reductions %d times', num_reductions) break def partition_dag(self): @@ -694,66 +671,63 @@ def partition_dag(self): G = self._dag st_gid = len(self._drop_list) + 1 init_c = st_gid - el = sorted(G.edges(data=True), key=lambda ed: ed[2]["weight"] * -1) + el = sorted(G.edges(data=True), key=lambda ed: ed[2]['weight'] * -1) stt = time.time() topo_sorted = nx.topological_sort(G) - g_dict = self._part_dict # dict() #{gid : Partition} + g_dict = self._part_dict#dict() #{gid : Partition} curr_lpl = None parts = [] plots_data = [] dump_progress = self._dump_progress for n in G.nodes(data=True): - n[1]["gid"] = st_gid + n[1]['gid'] = st_gid part = KFamilyPartition(st_gid, self._max_dop, global_dag=G) part.add_node(n[0]) g_dict[st_gid] = part - parts.append(part) # will it get rejected? + parts.append(part) # will it get rejected? st_gid += 1 for i, e in enumerate(el): u = e[0] - gu = G.node[u] + gu = G.nodes[u] v = e[1] - gv = G.node[v] - ow = G.adj[u][v]["weight"] - G.adj[u][v]["weight"] = 0 # edge zeroing - ugid = gu.get("gid", None) - vgid = gv.get("gid", None) - if ugid != vgid: # merge existing parts - part = self._merge_two_parts(ugid, vgid, u, v, gu, gv, g_dict, parts, G) - if part is not None: + gv = G.nodes[v] + ow = G.adj[u][v]['weight'] + G.adj[u][v]['weight'] = 0 #edge zeroing + ugid = gu.get('gid', None) + vgid = gv.get('gid', None) + if (ugid != vgid): # merge existing parts + part = self._merge_two_parts(ugid, vgid, + u, v, gu, gv, g_dict, parts, G) + if (part is not None): st_gid -= 1 self._sspace[i] = 1 else: - G.adj[u][v]["weight"] = ow + G.adj[u][v]['weight'] = ow self._part_edges.append(e) - if dump_progress: + if (dump_progress): bb = np.median([pp._tmp_max_dop for pp in parts]) - curr_lpl = DAGUtil.get_longest_path( - G, show_path=False, topo_sort=topo_sorted - )[1] - plots_data.append("%d,%d,%d" % (curr_lpl, len(parts), bb)) + curr_lpl = DAGUtil.get_longest_path(G, show_path=False, + topo_sort=topo_sorted)[1] + plots_data.append('%d,%d,%d' % (curr_lpl, len(parts), bb)) self.reduce_partitions(parts, g_dict, G) edt = time.time() - stt self._parts = parts - if dump_progress: - with open("/tmp/%.3f_lpl_parts.csv" % time.time(), "w") as of: + if (dump_progress): + with open('/tmp/%.3f_lpl_parts.csv' % time.time(), 'w') as of: of.writelines(os.linesep.join(plots_data)) - if curr_lpl is None: - curr_lpl = DAGUtil.get_longest_path( - G, show_path=False, topo_sort=topo_sorted - )[1] + if (curr_lpl is None): + curr_lpl = DAGUtil.get_longest_path(G, show_path=False, + topo_sort=topo_sorted)[1] return ((st_gid - init_c), curr_lpl, edt, parts) - class MinNumPartsScheduler(MySarkarScheduler): """ A special type of partition that aims to schedule the DAG on time but at minimum cost. In this particular case, the cost is the number of partitions that will be generated. The assumption is # of partitions (with certain DoP) more or less represents resource footprint. """ - def __init__(self, drop_list, deadline, max_dop=8, dag=None, optimistic_factor=0.5): super(MinNumPartsScheduler, self).__init__(drop_list, max_dop=max_dop, dag=dag) self._deadline = deadline @@ -780,22 +754,22 @@ def is_time_critical(self, u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el): probility = (num of edges need to be zeroed to meet the deadline) / (num of remaining unzeroed edges) """ - if unew and vnew: + if (unew and vnew): return True # compute time criticality probility ttlen = float(len(rem_el)) - if ttlen == 0: + if (ttlen == 0): return False c = 0 for i, e in enumerate(rem_el): c = i - edge_weight = self._dag.edge[e[0]][e[1]]["weight"] - if (curr_lpl - edge_weight) <= self._deadline: + edge_weight = self._dag.edge[e[0]][e[1]]['weight'] + if ((curr_lpl - edge_weight) <= self._deadline): break # probability that remaining edges will be zeroed in order to meet the deadline prob = (c + 1) / ttlen time_critical = True if (prob > self._optimistic_factor) else False - # print "time criticality is {0}, prob is {1}".format(time_critical, prob) + #print "time criticality is {0}, prob is {1}".format(time_critical, prob) return time_critical # if (time_critical): # # enforce sequentialisation @@ -809,7 +783,6 @@ def is_time_critical(self, u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el): # else: # join the partition to minimise num_part # return True - class PSOScheduler(Scheduler): """ Use the Particle Swarm Optimisation to guide the Sarkar algorithm @@ -837,13 +810,10 @@ class PSOScheduler(Scheduler): based on X[i] value, reject or linearisation (2) returns makespan """ - - def __init__( - self, drop_list, max_dop=8, dag=None, deadline=None, topk=30, swarm_size=40 - ): + def __init__(self, drop_list, max_dop=8, dag=None, deadline=None, topk=30, swarm_size=40): super(PSOScheduler, self).__init__(drop_list, max_dop=max_dop, dag=dag) self._deadline = deadline - # search space: key - combination of X[i] (string), + #search space: key - combination of X[i] (string), # val - a tuple of (critical_path (int), num_parts (int)) self._sspace_dict = dict() self._topk = topk @@ -867,33 +837,27 @@ def partition_dag(self): lb = [0.99] * self._leng ub = [3.01] * self._leng stt = time.time() - if self._deadline is None: + if (self._deadline is None): xopt, fopt = pso(self.objective_func, lb, ub, swarmsize=self._swarm_size) else: - xopt, fopt = pso( - self.objective_func, - lb, - ub, - ieqcons=[self.constrain_func], - swarmsize=self._swarm_size, - ) + xopt, fopt = pso(self.objective_func, lb, ub, ieqcons=[self.constrain_func], swarmsize=self._swarm_size) curr_lpl, num_parts, parts, g_dict = self._partition_G(G, xopt) - # curr_lpl, num_parts, parts, g_dict = self.objective_func(xopt) + #curr_lpl, num_parts, parts, g_dict = self.objective_func(xopt) self._part_dict = g_dict edt = time.time() - # print "PSO scheduler took {0} seconds".format(edt - stt) + #print "PSO scheduler took {0} seconds".format(edt - stt) st_gid = len(self._drop_list) + 1 + num_parts for n in G.nodes(data=True): - if not "gid" in n[1]: - n[1]["gid"] = st_gid + if not 'gid' in n[1]: + n[1]['gid'] = st_gid part = Partition(st_gid, self._max_dop) - part.add_node(n[0], n[1].get("weight", 1)) + part.add_node(n[0], n[1].get('weight', 1)) g_dict[st_gid] = part - parts.append(part) # will it get rejected? + parts.append(part) # will it get rejected? num_parts += 1 self._parts = parts - # print "call counts ", self._call_counts + #print "call counts ", self._call_counts return (num_parts, curr_lpl, edt - stt, parts) def _partition_G(self, G, x): @@ -901,88 +865,83 @@ def _partition_G(self, G, x): A helper function to partition G based on a given scheme x subject to constraints imposed by each partition's DoP """ - # print x + #print x st_gid = len(self._drop_list) + 1 init_c = st_gid - el = sorted(G.edges(data=True), key=lambda ed: ed[2]["weight"] * -1) - # topo_sorted = nx.topological_sort(G) - # g_dict = self._part_dict#dict() #{gid : Partition} + el = sorted(G.edges(data=True), key=lambda ed: ed[2]['weight'] * -1) + #topo_sorted = nx.topological_sort(G) + #g_dict = self._part_dict#dict() #{gid : Partition} g_dict = dict() parts = [] for i, e in enumerate(el): pos = int(round(x[i])) - if pos == 3: # 10 non_zero + 1 + if (pos == 3): #10 non_zero + 1 continue - elif pos == 2: # 01 zero with linearisation + 1 + elif (pos == 2):#01 zero with linearisation + 1 linear = True - elif pos == 1: # 00 zero without linearisation + 1 + elif (pos == 1): #00 zero without linearisation + 1 linear = False else: raise SchedulerException("PSO position out of bound: {0}".format(pos)) u = e[0] - gu = G.node[u] + gu = G.nodes[u] v = e[1] - gv = G.node[v] - ow = G.adj[u][v]["weight"] - G.adj[u][v]["weight"] = 0 # edge zeroing + gv = G.nodes[v] + ow = G.adj[u][v]['weight'] + G.adj[u][v]['weight'] = 0 #edge zeroing recover_edge = False - ugid = gu.get("gid", None) - vgid = gv.get("gid", None) - if ugid and (not vgid): + ugid = gu.get('gid', None) + vgid = gv.get('gid', None) + if (ugid and (not vgid)): part = g_dict[ugid] - elif (not ugid) and vgid: + elif ((not ugid) and vgid): part = g_dict[vgid] - elif not ugid and (not vgid): + elif (not ugid and (not vgid)): part = Partition(st_gid, self._max_dop) g_dict[st_gid] = part - parts.append(part) # will it get rejected? + parts.append(part) # will it get rejected? st_gid += 1 - else: # elif (ugid and vgid): + else: #elif (ugid and vgid): # cannot change Partition once is in! part = None - # uw = gu['weight'] - # vw = gv['weight'] + #uw = gu['weight'] + #vw = gv['weight'] - if part is None: + if (part is None): recover_edge = True else: ca, unew, vnew = part.can_add(u, v, gu, gv) - if ca: + if (ca): # ignore linear flag, add it anyway part.add(u, v, gu, gv) - gu["gid"] = part._gid - gv["gid"] = part._gid + gu['gid'] = part._gid + gv['gid'] = part._gid else: - if linear: + if (linear): part.add(u, v, gu, gv, sequential=True, global_dag=G) - gu["gid"] = part._gid - gv["gid"] = part._gid + gu['gid'] = part._gid + gv['gid'] = part._gid else: - recover_edge = True # outright rejection - if recover_edge: - G.adj[u][v]["weight"] = ow + recover_edge = True #outright rejection + if (recover_edge): + G.adj[u][v]['weight'] = ow self._part_edges.append(e) self._call_counts += 1 - # print "called {0} times, len parts = {1}".format(self._call_counts, len(parts)) - return ( - DAGUtil.get_longest_path(G, show_path=False)[1], - len(parts), - parts, - g_dict, - ) + #print "called {0} times, len parts = {1}".format(self._call_counts, len(parts)) + return (DAGUtil.get_longest_path(G, show_path=False)[1], len(parts), parts, g_dict) def constrain_func(self, x): """ Deadline - critical_path >= 0 """ - if self._deadline is None: + if (self._deadline is None): raise SchedulerException("Deadline is None, cannot apply constraints!") - sk = "".join([str(int(round(xi))) for xi in x[0 : self._topk]]) + sk = ''.join([str(int(round(xi))) for xi in x[0:self._topk]]) stuff = self._sspace_dict.get(sk, None) - if stuff is None: + if (stuff is None): G = self._lite_dag.copy() stuff = self._partition_G(G, x) self._sspace_dict[sk] = stuff[0:2] @@ -995,16 +954,16 @@ def objective_func(self, x): indices of x is identical to the indices in G.edges().sort(key='weight') """ # first check if the solution is already available in the search space - sk = "".join([str(int(round(xi))) for xi in x[0 : self._topk]]) - stuff = self._sspace_dict.get(sk, None) # TODO is this atomic operation? - if stuff is None: + sk = ''.join([str(int(round(xi))) for xi in x[0:self._topk]]) + stuff = self._sspace_dict.get(sk, None) #TODO is this atomic operation? + if (stuff is None): # make a deep copy to avoid mix up multiple particles, # each of which has multiple iterations G = self._lite_dag.copy() stuff = self._partition_G(G, x) self._sspace_dict[sk] = stuff[0:2] del G - if self._deadline is None: + if (self._deadline is None): return stuff[0] else: return stuff[1] @@ -1014,11 +973,8 @@ class DAGUtil(object): """ Helper functions dealing with DAG """ - @staticmethod - def get_longest_path( - G, weight="weight", default_weight=1, show_path=True, topo_sort=None - ): + def get_longest_path(G, weight='weight', default_weight=1, show_path=True, topo_sort=None): """ Ported from: https://github.com/networkx/networkx/blob/master/networkx/algorithms/dag.py @@ -1032,31 +988,24 @@ def get_longest_path( :return: a tuple with two elements: `path` (list), the longest path, and `path_length` (float) the length of the longest path. """ - dist = {} # stores {v : (length, u)} - if topo_sort is None: + dist = {} # stores {v : (length, u)} + if (topo_sort is None): topo_sort = nx.topological_sort(G) for v in topo_sort: us = [ - ( - dist[u][0] - + data.get(weight, default_weight) # accumulate - + G.node[u].get(weight, 0) # edge weight - + ( # u node weight - G.node[v].get(weight, 0) - if len(list(G.successors(v))) == 0 - else 0 - ), # v node weight if no successor - u, - ) - for u, data in G.pred[v].items() - ] + (dist[u][0] + #accumulate + data.get(weight, default_weight) + #edge weight + G.nodes[u].get(weight, 0) + # u node weight + (G.nodes[v].get(weight, 0) if len(list(G.successors(v))) == 0 else 0), # v node weight if no successor + u) + for u, data in G.pred[v].items()] # Use the best predecessor if there is one and its distance is non-negative, otherwise terminate. maxu = max(us) if us else (0, v) dist[v] = maxu if maxu[0] >= 0 else (0, v) u = None v = max(dist, key=dist.get) lp = dist[v][0] - if not show_path: + if (not show_path): path = None else: path = [] @@ -1068,7 +1017,7 @@ def get_longest_path( return (path, lp) @staticmethod - def get_max_width(G, weight="weight", default_weight=1): + def get_max_width(G, weight='weight', default_weight=1): """ Get the antichain with the maximum "weighted" width of this DAG weight: float (for example, it could be RAM consumption in GB) @@ -1078,8 +1027,8 @@ def get_max_width(G, weight="weight", default_weight=1): for antichain in nx.antichains(G): t = 0 for n in antichain: - t += G.node[n].get(weight, default_weight) - if t > max_width: + t += G.nodes[n].get(weight, default_weight) + if (t > max_width): max_width = t return max_width @@ -1115,30 +1064,30 @@ def prune_antichains(antichains): todo = [] for antichain in antichains: todo.append(antichain) - todo.sort(key=lambda x: len(x), reverse=True) + todo.sort(key=lambda x : len(x), reverse=True) return todo @staticmethod - def label_schedule(G, weight="weight", topo_sort=None): + def label_schedule(G, weight='weight', topo_sort=None): """ for each node, label its start and end time """ - if topo_sort is None: + if (topo_sort is None): topo_sort = nx.topological_sort(G) for v in topo_sort: - gv = G.node[v] + gv = G.nodes[v] parents = list(G.predecessors(v)) - if len(parents) == 0: - gv["stt"] = 0 + if (len(parents) == 0): + gv['stt'] = 0 else: # get the latest end time of one of its parents ledt = -1 for parent in parents: - pedt = G.node[parent]["edt"] + G.adj[parent][v].get(weight, 0) - if pedt > ledt: + pedt = G.nodes[parent]['edt'] + G.adj[parent][v].get(weight, 0) + if (pedt > ledt): ledt = pedt - gv["stt"] = ledt - gv["edt"] = gv["stt"] + gv.get(weight, 0) + gv['stt'] = ledt + gv['edt'] = gv['stt'] + gv.get(weight, 0) @staticmethod def ganttchart_matrix(G, topo_sort=None): @@ -1146,24 +1095,23 @@ def ganttchart_matrix(G, topo_sort=None): Return a M (# of DROPs) by N (longest path length) matrix """ lpl = DAGUtil.get_longest_path(G, show_path=True) - # N = lpl[1] - (len(lpl[0]) - 1) + #N = lpl[1] - (len(lpl[0]) - 1) N = lpl[1] M = G.number_of_nodes() ma = np.zeros((M, N), dtype=np.int) - if topo_sort is None: + if (topo_sort is None): topo_sort = nx.topological_sort(G) for i, n in enumerate(topo_sort): - node = G.node[n] + node = G.nodes[n] try: - stt = node["stt"] - edt = node["edt"] + stt = node['stt'] + edt = node['edt'] except KeyError as ke: - raise SchedulerException( - "No schedule labels found: {0}".format(str(ke)) - ) - # print i, n, stt, edt + raise SchedulerException("No schedule labels found: {0}".\ + format(str(ke))) + #print i, n, stt, edt leng = edt - stt - if edt == stt: + if (edt == stt): continue try: ma[i, stt:edt] = np.ones((1, leng)) @@ -1171,7 +1119,7 @@ def ganttchart_matrix(G, topo_sort=None): logger.error("i, stt, edt, leng = %d, %d, %d, %d", i, stt, edt, leng) logger.error("N, M = %d, %d", M, N) raise - # print ma[i, :] + #print ma[i, :] return ma @staticmethod @@ -1180,26 +1128,20 @@ def import_metis(): import metis as mt except: pl = platform.platform() - if pl.startswith("Darwin"): # a clumsy way - ext = "dylib" + if (pl.startswith('Darwin')): # a clumsy way + ext = 'dylib' else: - ext = "so" # what about Microsoft??!! - os.environ["METIS_DLL"] = pkg_resources.resource_filename( - "dlg.dropmake", "lib/libmetis.{0}".format(ext) - ) # @UndefinedVariable + ext = 'so' # what about Microsoft??!! + os.environ["METIS_DLL"] = pkg_resources.resource_filename('dlg.dropmake', 'lib/libmetis.{0}'.format(ext)) # @UndefinedVariable import metis as mt - if not hasattr(mt, "_dlg_patched"): + if not hasattr(mt, '_dlg_patched'): mt._part_graph = mt.part_graph - def logged_part_graph(*args, **kwargs): - logger.info("Starting metis partitioning") + logger.info('Starting metis partitioning') start = time.time() ret = mt._part_graph(*args, **kwargs) # @UndefinedVariable - logger.info( - "Finished metis partitioning in %.3f [s]", time.time() - start - ) + logger.info('Finished metis partitioning in %.3f [s]', time.time() - start) return ret - mt.part_graph = logged_part_graph mt._dlg_patched = True return mt @@ -1214,71 +1156,55 @@ def build_dag_from_drops(drop_list, embed_drop=True, fake_super_root=False): """ # tw - task weight # dw - data weight / volume - key_dict = dict() # {oid : node_id} - drop_dict = dict() # {oid : drop} - out_bound_keys = ["streamingConsumers", "consumers", "outputs"] + key_dict = dict() # {oid : node_id} + drop_dict = dict() # {oid : drop} + out_bound_keys = ['streamingConsumers', 'consumers', 'outputs'] for i, drop in enumerate(drop_list): - oid = drop["oid"] - key_dict[oid] = i + 1 # starting from 1 + oid = drop['oid'] + key_dict[oid] = i + 1 # starting from 1 drop_dict[oid] = drop G = nx.DiGraph() for i, drop in enumerate(drop_list): - oid = drop["oid"] + oid = drop['oid'] myk = i + 1 tt = drop["type"] - if DropType.PLAIN == tt: + if (DropType.PLAIN == tt): # if (drop['nm'] == 'StreamNull'): # obk = 'streamingConsumers' # else: # obk = 'consumers' # outbound keyword tw = 0 dtp = 0 - elif DropType.APP == tt: - # obk = 'outputs' - tw = int(drop["tw"]) + elif (DropType.APP == tt): + #obk = 'outputs' + tw = int(drop['tw']) dtp = 1 elif DropType.SERVICE_APP == tt: - tw = int(drop["tw"]) + tw = int(drop['tw']) dtp = 1 else: - raise SchedulerException("Drop Type '{0}' not supported".format(tt)) - num_cpus = drop.get("num_cpus", 1) - if embed_drop: - G.add_node( - myk, - weight=tw, - text=drop["nm"], - dt=dtp, - drop_spec=drop, - num_cpus=num_cpus, - ) + raise SchedulerException("Drop Type '{0}' not supported".\ + format(tt)) + num_cpus = drop.get('num_cpus', 1) + if (embed_drop): + G.add_node(myk, weight=tw, text=drop['nm'], dt=dtp, + drop_spec=drop, num_cpus=num_cpus) else: - G.add_node(myk, weight=tw, text=drop["nm"], dt=dtp, num_cpus=num_cpus) + G.add_node(myk, weight=tw, text=drop['nm'], dt=dtp, + num_cpus=num_cpus) for obk in out_bound_keys: if obk in drop: for oup in drop[obk]: - if DropType.PLAIN == tt: - G.add_weighted_edges_from( - [(myk, key_dict[oup], int(drop["dw"]))] - ) - elif DropType.APP == tt: - G.add_weighted_edges_from( - [(myk, key_dict[oup], int(drop_dict[oup].get("dw", 5)))] - ) - - if fake_super_root: - super_root = dropdict( - {"oid": "-92", "type": DropType.PLAIN, "storage": "null"} - ) + if (DropType.PLAIN == tt): + G.add_weighted_edges_from([(myk, key_dict[oup], int(drop['dw']))]) + elif (DropType.APP == tt): + G.add_weighted_edges_from([(myk, key_dict[oup], int(drop_dict[oup].get('dw', 5)))]) + + if (fake_super_root): + super_root = dropdict({'oid':'-92', "type": DropType.PLAIN, 'storage':'null'}) super_k = len(drop_list) + 1 - G.add_node( - super_k, - weight=0, - dtp=0, - drop_spec=super_root, - num_cpus=0, - text="fake_super_root", - ) + G.add_node(super_k, weight=0, dtp=0, drop_spec=super_root, + num_cpus=0, text='fake_super_root') for oup in get_roots(drop_list): G.add_weighted_edges_from([(super_k, key_dict[oup], 1)]) @@ -1291,9 +1217,9 @@ def metis_part(G, num_partitions): Use metis binary executable (instead of library) This is used only for testing when libmetis halts unexpectedly """ - outf = "/tmp/mm" + outf = '/tmp/mm' lines = [] - part_id_line_dict = dict() # {part_id: line_num} + part_id_line_dict = dict() # {part_id: line_num} line_part_id_dict = dict() for i, n in enumerate(G.nodes()): part_id_line_dict[n] = i + 1 @@ -1302,14 +1228,14 @@ def metis_part(G, num_partitions): for i, node in enumerate(G.nodes(data=True)): n = node[0] line = [] - line.append(str(node[1]["wkl"])) - line.append(str(node[1]["eff"])) + line.append(str(node[1]['wkl'])) + line.append(str(node[1]['eff'])) for m in G.neighbors(n): line.append(str(part_id_line_dict[m])) - a = G[m][n]["weight"] - if 0 == a: + a = G[m][n]['weight'] + if (0 == a): logger.debug("G[%d][%d]['weight'] = %f", m, n, a) - line.append(str(G[m][n]["weight"])) + line.append(str(G[m][n]['weight'])) lines.append(" ".join(line)) header = "{0} {1} 011 2".format(len(G.nodes()), len(G.edges())) @@ -1317,15 +1243,14 @@ def metis_part(G, num_partitions): with open(outf, "w") as f: f.write("\n".join(lines)) - if __name__ == "__main__": G = nx.DiGraph() - G.add_weighted_edges_from([(4, 3, 1), (3, 2, 4), (2, 1, 2), (5, 3, 1)]) - G.add_weighted_edges_from([(3, 6, 5), (6, 7, 2)]) - G.add_weighted_edges_from([(9, 12, 2)]) # testing independent nodes - G.node[3]["weight"] = 65 + G.add_weighted_edges_from([(4,3,1), (3,2,4), (2,1,2), (5,3,1)]) + G.add_weighted_edges_from([(3,6,5), (6,7,2)]) + G.add_weighted_edges_from([(9,12,2)]) # testing independent nodes + G.nodes[3]['weight'] = 65 print(G.pred[12].items()) - print(G.node[G.predecessors(12)[0]]) + print(G.nodes[G.predecessors(12)[0]]) # print "prepre" # print len(G.pred[7].items()) @@ -1345,11 +1270,7 @@ def metis_part(G, num_partitions): print("The longest path is {0} with a length of {1}".format(lp[0], lp[1])) mw = DAGUtil.get_max_width(G) dop = DAGUtil.get_max_dop(G) - print( - "The max (weighted) width = {0}, and the max degree of parallelism = {1}".format( - mw, dop - ) - ) + print("The max (weighted) width = {0}, and the max degree of parallelism = {1}".format(mw, dop)) DAGUtil.label_schedule(G) print(G.nodes(data=True)) gantt_matrix = DAGUtil.ganttchart_matrix(G) @@ -1360,4 +1281,4 @@ def metis_part(G, num_partitions): # print sch_mat # print sch_mat.shape - # print DAGUtil.prune_antichains([[], [64], [62], [62, 64], [61], [61, 64], [61, 62], [61, 62, 64], [5], [1]]) + #print DAGUtil.prune_antichains([[], [64], [62], [62, 64], [61], [61, 64], [61, 62], [61, 62, 64], [5], [1]]) diff --git a/daliuge-translator/dlg/dropmake/utils/antichains.py b/daliuge-translator/dlg/dropmake/utils/antichains.py index 459d493c1..e3247797d 100644 --- a/daliuge-translator/dlg/dropmake/utils/antichains.py +++ b/daliuge-translator/dlg/dropmake/utils/antichains.py @@ -34,33 +34,33 @@ import networkx as nx - -def _create_split_graph(dag, w_attr="weight"): +def _create_split_graph(dag, w_attr='weight'): """ Given a normal DiGraph, create its equivalent split graph """ bpg = nx.DiGraph() for el in dag.nodes(data=True): - xi = "{0}_x".format(el[0]) - yi = "{0}_y".format(el[0]) - # print(el) - bpg.add_edge("s", xi, capacity=el[1].get(w_attr, 1), weight=0) + xi = '{0}_x'.format(el[0]) + yi = '{0}_y'.format(el[0]) + #print(el) + bpg.add_edge('s', xi, capacity=el[1].get(w_attr, 1), weight=0) bpg.add_edge(xi, yi, capacity=sys.maxsize, weight=1) - bpg.add_edge(yi, "t", capacity=el[1].get(w_attr, 1), weight=0) + bpg.add_edge(yi, 't', capacity=el[1].get(w_attr, 1), weight=0) el_des = nx.descendants(dag, el[0]) el_pred = nx.ancestors(dag, el[0]) for udown in el_des: - bpg.add_edge(xi, "{0}_y".format(udown), capacity=sys.maxsize, weight=0) + bpg.add_edge(xi, '{0}_y'.format(udown), + capacity=sys.maxsize, weight=0) for uup in el_pred: - bpg.add_edge("{0}_x".format(uup), yi, capacity=sys.maxsize, weight=0) + bpg.add_edge('{0}_x'.format(uup), yi, + capacity=sys.maxsize, weight=0) return bpg - def _get_pi_solution(split_graph): - """ + """ 1. create H (admissable graph) based on Section 3 http://fmdb.cs.ucla.edu/Treports/930014.pdf @@ -72,41 +72,40 @@ def _get_pi_solution(split_graph): 4. calculate Pi based on Section 3 again """ - # Step 1 - H = nx.DiGraph() - H.add_nodes_from(split_graph) - for ed in split_graph.edges(data=True): - Cxy = ed[2].get("capacity", sys.maxsize) - Axy = ed[2]["weight"] - if Axy == 0 and Cxy > 0: - H.add_edge(ed[0], ed[1], capacity=Cxy, weight=Axy) - - # Step 2 - flow_value, flow_dict = nx.maximum_flow(H, "s", "t") - - # Step 3 - R = nx.DiGraph() - R.add_nodes_from(H) - for ed in H.edges(data=True): - Xij = flow_dict[ed[0]][ed[1]] - Uij = ed[2].get("capacity", sys.maxsize) - Cij = ed[2]["weight"] - if (Uij - Xij) > 0: - R.add_edge(ed[0], ed[1], weight=Cij) - if Xij > 0: - R.add_edge(ed[1], ed[0], weight=-1 * Cij) - - # Step 4 - pai = dict() - for n in R.nodes(): - if nx.has_path(R, "s", n): - pai[n] = 0 - else: - pai[n] = 1 - return pai - - -def get_max_weighted_antichain(dag, w_attr="weight"): + # Step 1 + H = nx.DiGraph() + H.add_nodes_from(split_graph) + for ed in split_graph.edges(data=True): + Cxy = ed[2].get('capacity', sys.maxsize) + Axy = ed[2]['weight'] + if (Axy == 0 and Cxy > 0): + H.add_edge(ed[0], ed[1], capacity=Cxy, weight=Axy) + + # Step 2 + flow_value, flow_dict = nx.maximum_flow(H, 's', 't') + + # Step 3 + R = nx.DiGraph() + R.add_nodes_from(H) + for ed in H.edges(data=True): + Xij = flow_dict[ed[0]][ed[1]] + Uij = ed[2].get('capacity', sys.maxsize) + Cij = ed[2]['weight'] + if (Uij - Xij) > 0: + R.add_edge(ed[0], ed[1], weight=Cij) + if (Xij > 0): + R.add_edge(ed[1], ed[0], weight=-1 * Cij) + + # Step 4 + pai = dict() + for n in R.nodes(): + if (nx.has_path(R, 's', n)): + pai[n] = 0 + else: + pai[n] = 1 + return pai + +def get_max_weighted_antichain(dag, w_attr='weight'): """ Given a a nextworkx DiGraph `dag`, return a tuple. The first element is the length of the max_weighted_antichain @@ -119,31 +118,30 @@ def get_max_weighted_antichain(dag, w_attr="weight"): bpg = _create_split_graph(dag, w_attr=w_attr) pai = _get_pi_solution(bpg) - w_antichain_len = 0 # weighted antichain length + w_antichain_len = 0 #weighted antichain length antichain_names = [] for h in range(2): for nd in bpg.nodes(): - if nd.endswith("_x"): - y_nd = nd.split("_x")[0] + "_y" - if (1 - pai[nd] + pai["s"] == h) and (pai[y_nd] - pai[nd] == 1): - w_antichain_len += bpg.adj["s"][nd]["capacity"] - # print(' *** %d' % bpg.edge['s'][nd]['capacity']) + if (nd.endswith('_x')): + y_nd = nd.split('_x')[0] + '_y' + if ((1 - pai[nd] + pai['s'] == h) and + (pai[y_nd] - pai[nd] == 1)): + w_antichain_len += bpg.adj['s'][nd]['capacity'] + #print(' *** %d' % bpg.edge['s'][nd]['capacity']) antichain_names.append(nd) return w_antichain_len, antichain_names - def create_small_seq_graph(): G = nx.DiGraph() G.add_edge(1, 2) G.add_edge(2, 3) - G.node[1]["weight"] = 5 - G.node[2]["weight"] = 4 - G.node[3]["weight"] = 7 - # print("") + G.nodes[1]['weight'] = 5 + G.nodes[2]['weight'] = 4 + G.nodes[3]['weight'] = 7 + #print("") return G, 7 - def create_medium_seq_graph(): G = create_small_seq_graph()[0] G.add_edge(3, 4) @@ -151,44 +149,39 @@ def create_medium_seq_graph(): G.add_edge(4, 6) G.add_edge(5, 6) G.add_edge(6, 7) - G.node[4]["weight"] = 3 - G.node[5]["weight"] = 2 - G.node[6]["weight"] = 6 - G.node[7]["weight"] = 1 + G.nodes[4]['weight'] = 3 + G.nodes[5]['weight'] = 2 + G.nodes[6]['weight'] = 6 + G.nodes[7]['weight'] = 1 return G, 7 - def create_small_parral_graph(): G = nx.DiGraph() G.add_edge(1, 2) G.add_edge(1, 3) - G.node[1]["weight"] = 5 - G.node[2]["weight"] = 6 - G.node[3]["weight"] = 7 + G.nodes[1]['weight'] = 5 + G.nodes[2]['weight'] = 6 + G.nodes[3]['weight'] = 7 return G, 13 - def create_medium_parral_graph(): G = create_small_parral_graph()[0] G.add_edge(2, 4) G.add_edge(2, 5) - G.node[4]["weight"] = 3 - G.node[5]["weight"] = 4 + G.nodes[4]['weight'] = 3 + G.nodes[5]['weight'] = 4 return G, 14 - if __name__ == "__main__": - gs = [ - create_small_seq_graph(), - create_medium_seq_graph(), - create_small_parral_graph(), - create_medium_parral_graph(), - ] + gs = [create_small_seq_graph(), + create_medium_seq_graph(), + create_small_parral_graph(), + create_medium_parral_graph()] for g, lt in gs: w, l = get_max_weighted_antichain(g) print(w) print(l) - if w != lt: + if (w != lt): print("Calculated %d != %d, which is the ground-truth" % (w, lt)) - print("") + print('') diff --git a/daliuge-translator/setup.py b/daliuge-translator/setup.py index b7987ff57..9826a92ae 100644 --- a/daliuge-translator/setup.py +++ b/daliuge-translator/setup.py @@ -98,7 +98,7 @@ def package_files(directory): "daliuge-common==%s" % (VERSION,), "metis>=0.2a3", # We are not compatible with networkx 2.4 yet, so we need to constrain that - "networkx<2.4", + "networkx", "numpy", "psutil", "pyswarm",