From 2d16cd726c733cf9f6b33ca79e307acd07cb8185 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Thu, 27 Apr 2023 18:50:57 -0400 Subject: [PATCH 01/32] some refactoring of spec support --- deps/pash | 2 +- parallel-orch/partial_program_order.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/pash b/deps/pash index cdb249cd..3f875e27 160000 --- a/deps/pash +++ b/deps/pash @@ -1 +1 @@ -Subproject commit cdb249cd774b78966306e02610c544d3c48c184a +Subproject commit 3f875e27bd4f325dba71c16bba765411bf5e9bb8 diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 50656c17..8bc80001 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -582,7 +582,7 @@ def log_committed_cmd_state(self): logging.info("--------------------------------------") - +## TODO: Try to move those to PaSh and import them here def parse_cmd_from_file(file_path: str) -> str: with open(file_path) as f: cmd = f.read() From 57a0dc422ce8084e38e7dffc7a8765b265b37947 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Thu, 27 Apr 2023 19:45:38 -0400 Subject: [PATCH 02/32] Add some initial scaffoling for loop support --- deps/pash | 2 +- loop.sh | 5 +++ parallel-orch/partial_program_order.py | 44 ++++++++++++++++++++++---- parallel-orch/scheduler_server.py | 3 ++ 4 files changed, 46 insertions(+), 8 deletions(-) create mode 100644 loop.sh diff --git a/deps/pash b/deps/pash index 3f875e27..8f4a51aa 160000 --- a/deps/pash +++ b/deps/pash @@ -1 +1 @@ -Subproject commit 3f875e27bd4f325dba71c16bba765411bf5e9bb8 +Subproject commit 8f4a51aacb1371c30896b74b5e3398f3a72dc115 diff --git a/loop.sh b/loop.sh new file mode 100644 index 00000000..7fcfdf0b --- /dev/null +++ b/loop.sh @@ -0,0 +1,5 @@ +echo hi +for i in 1 2 3; do + ## Until we figure out variables the only way to determine that this run twice is by measuring the executed time + sleep 2 +done \ No newline at end of file diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 8bc80001..f50c6632 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -23,10 +23,11 @@ def __str__(self): return f'CompletedNodeInfo(ec:{self.get_exit_code()}, vf:{self.get_variable_file()}, stdout:{self.get_stdout_file()})' class Node: - def __init__(self, id, cmd): + def __init__(self, id, cmd, loop_context): self.cmd = cmd self.id = id self.cmd_no_redir = trace.remove_command_redir(self.cmd) + self.loop_context = loop_context def __str__(self): # return f"ID: {self.id}\nCMD: {self.cmd}\nR: {self.read_set}\nW: {self.write_set}" @@ -42,6 +43,9 @@ def get_cmd(self) -> str: def get_cmd_no_redir(self) -> str: return self.cmd_no_redir + def in_loop(self) -> bool: + return len(self.loop_context) > 0 + ## Note: This information is valid only after a node is committed. ## It might be set even before that, but it should only be retrieved when ## a node is committed. @@ -419,7 +423,8 @@ def schedule_all_workset_non_frontier_cmds(self): if not self.is_frontier(node_id)] for cmd_id in non_frontier_ids: # We also need for a cmd to not be waiting to be resolved. - if not cmd_id in self.commands_currently_executing and not cmd_id in self.waiting_to_be_resolved: + if not cmd_id in self.commands_currently_executing and \ + not cmd_id in self.waiting_to_be_resolved: self.speculate_cmd_non_blocking(cmd_id) def run_all_frontier_cmds(self): @@ -592,6 +597,23 @@ def parse_edge_line(line: str) -> "tuple[int, int]": from_str, to_str = line.split(" -> ") return (int(from_str), int(to_str)) +def parse_loop_context_line(line: str) -> "tuple[int, list[int]]": + node_id, loop_contexts_raw = line.split("-loop_ctx-") + if loop_contexts_raw != "": + loop_contexts_str = loop_contexts_raw.split(",") + loop_contexts = [int(loop_ctx) for loop_ctx in loop_contexts_str] + else: + loop_contexts = [] + return int(node_id), loop_contexts + +def parse_loop_contexts(lines): + loop_contexts = {} + for line in lines: + node_id, loop_ctx = parse_loop_context_line(line) + loop_contexts[node_id] = loop_ctx + + return loop_contexts + def parse_partial_program_order_from_file(file_path: str) -> PartialProgramOrder: with open(file_path) as f: raw_lines = f.readlines() @@ -600,23 +622,31 @@ def parse_partial_program_order_from_file(file_path: str) -> PartialProgramOrder lines = [line.rstrip() for line in raw_lines if not line.startswith("#")] - ## The first line is the directory in which cmd_files are + ## The directory in which cmd_files are cmds_directory = str(lines[0]) logging.debug(f'Cmds are stored in: {cmds_directory}') - ## The last line is the number of nodes - number_of_nodes = int(lines[-1]) + ## The number of nodes + number_of_nodes = int(lines[1]) logging.debug(f'Number of po cmds: {number_of_nodes}') + ## The loop context for each node + loop_context_start=2 + loop_context_end=number_of_nodes+2 + loop_context_lines = lines[loop_context_start:loop_context_end] + loop_contexts = parse_loop_contexts(loop_context_lines) + logging.debug(f'Loop contexts: {loop_contexts}') + ## The rest of the lines are edge_lines - edge_lines = lines[1:-1] + edge_lines = lines[loop_context_end:] logging.debug(f'Edges: {edge_lines}') nodes = {} for i in range(number_of_nodes): file_path = f'{cmds_directory}/{i}' cmd = parse_cmd_from_file(file_path) - nodes[i] = Node(i, cmd) + loop_ctx = loop_contexts[i] + nodes[i] = Node(i, cmd, loop_ctx) edges = {i : [] for i in range(number_of_nodes)} for edge_line in edge_lines: diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index f174e9e1..5abfa288 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -87,6 +87,9 @@ def handle_wait(self, input_cmd: str, connection): node_id = int(input_cmd.split(":")[1].rstrip()) logging.debug(f'Scheduler: Received wait for node_id: {node_id}') + ## TODO: If node is in a loop, then start executing it now even though + ## it is done executing. + ## If the node_id is already committed, just return its exit code if node_id in self.partial_program_order.get_committed(): logging.debug(f'Node: {node_id} found in committed, responding immediately!') From ca702f30d545c3fa5da9ae25dcfc91cc4de49967 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Tue, 2 May 2023 12:13:01 -0400 Subject: [PATCH 03/32] Fix an issue in installation --- scripts/install_deps_ubuntu20.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scripts/install_deps_ubuntu20.sh b/scripts/install_deps_ubuntu20.sh index ebb91baf..63e295f8 100755 --- a/scripts/install_deps_ubuntu20.sh +++ b/scripts/install_deps_ubuntu20.sh @@ -1,5 +1,8 @@ #!/bin/bash +export PASH_SPEC_TOP=${PASH_SPEC_TOP:-$(git rev-parse --show-toplevel --show-superproject-working-tree)} +export PASH_TOP=${PASH_TOP:-$PASH_SPEC_TOP/deps/pash} + ## Install Riker's dependencies sudo apt-get update sudo apt install -y make clang llvm git gcc python3-cram file graphviz From 80a225563be178489c0eed45ddfc4484842dffde Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Tue, 2 May 2023 12:13:15 -0400 Subject: [PATCH 04/32] Add comments on how to address the loop issue --- parallel-orch/partial_program_order.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index f50c6632..557587c3 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -84,6 +84,7 @@ def __init__(self, nodes, edges): # TODO: consider changing values to sets instead of lists self.adjacency = edges self.init_inverse_adjacency() + ## TODO: KK: Is it OK if we modify adjacency lists on the fly while processing the partial-order? ## self.committed is an add-only set, we never remove self.committed = set() ## Nodes that are in the frontier can only move to committed @@ -338,6 +339,11 @@ def rerun_stopped(self): new_stopped.remove(cmd_id) self.stopped = new_stopped + ## KK 2023-05-02: We should not be able to step/execute/speculate loop nodes, instead + ## the only action we should be able to do to them is to unroll them, + ## by creating iterations before them in the partial order. + ## The loop nodes then act as barriers that cannot be committed, executed (or put in the frontier) + ## and separate the already committed with the future partial order. def step_forward(self, old_speculated, old_committed): logging.debug(" > Committing frontier") self.commit_frontier() @@ -347,6 +353,8 @@ def step_forward(self, old_speculated, old_committed): self.populate_to_be_resolved_dict(old_committed) # Add frontier commands to committed set + ## TODO: Loop nodes should not be committed until we receive a wait for the node after them. + ## We don't know if they are done executing until then. def commit_frontier(self): # Second condition below may be unecessary for frontier_node in self.frontier: @@ -571,6 +579,7 @@ def populate_to_be_resolved_dict(self, old_committed): def get_currently_executing(self) -> list: return sorted(list(self.commands_currently_executing.keys())) + ## KK 2023-05-02 What does this function do? def save_commit_state_of_cmd(self, cmd_id): self.committed_order.append(cmd_id) self.commit_state[cmd_id] = set(self.committed) - set(self.to_be_resolved[cmd_id]) From c135d1dfdf5369ea4af7086676b5801d66e39f5a Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Tue, 2 May 2023 12:14:23 -0400 Subject: [PATCH 05/32] push pash --- deps/pash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/pash b/deps/pash index 8f4a51aa..c50d7e86 160000 --- a/deps/pash +++ b/deps/pash @@ -1 +1 @@ -Subproject commit 8f4a51aacb1371c30896b74b5e3398f3a72dc115 +Subproject commit c50d7e8650413f62f5eff85b331bc8db9f3afed4 From ece0f171b84c9bee1d7ea5a9cf74e908e8c83338 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Tue, 2 May 2023 12:31:16 -0400 Subject: [PATCH 06/32] Add a comment about how to unroll a loop concretely --- parallel-orch/partial_program_order.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 557587c3..bc62ab59 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -344,6 +344,24 @@ def rerun_stopped(self): ## by creating iterations before them in the partial order. ## The loop nodes then act as barriers that cannot be committed, executed (or put in the frontier) ## and separate the already committed with the future partial order. + ## + ## Note: We have to be careful when unrolling loops to unroll a complete iteration to start with + ## (to not have to deal with partial order relations between commands of different iterations). + ## + ## Concrete pseudocode: + ## def unroll(self, loop_id): + ## ## Finds all of the nodes in the same loop in the partial order + ## sub_po = self.find_loop_sub_partial_order(loop_id) + ## ## Find the previous node + ## previous_ids = self.find_prev_nodes(sub_po.first) + ## ## Create an iteration version (no loop nodes) of the sub_po + ## ## (be careful to not eliminate nested loop nodes) + ## sub_po_iter = create_iter(sub_po) + ## ## add the iteration between the loop and its previous node + ## self.add_po_between(sub_po_iter, previous_ids, sub_po.first) + ## + ## We need to determine when to call unroll. For now we can just do it if the frontier is empty + ## (which means that the next node of the frontier is a loop node). def step_forward(self, old_speculated, old_committed): logging.debug(" > Committing frontier") self.commit_frontier() From bfc9877f5403eaae126b2eed809913170914f3ad Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Tue, 2 May 2023 17:11:28 -0400 Subject: [PATCH 07/32] follow latest pash on loop iterations --- deps/pash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/pash b/deps/pash index c50d7e86..c7a8c24e 160000 --- a/deps/pash +++ b/deps/pash @@ -1 +1 @@ -Subproject commit c50d7e8650413f62f5eff85b331bc8db9f3afed4 +Subproject commit c7a8c24e88768f6a44aa4470c7fe6fbf2a68eb69 From 198a493a84c14b3fd7ba0a74085e9518854719eb Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Wed, 3 May 2023 11:34:08 -0400 Subject: [PATCH 08/32] checkpoint --- parallel-orch/scheduler_server.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index 5abfa288..8b0a3938 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -80,11 +80,22 @@ def handle_init(self, input_cmd: str): logging.debug(f'To be resolved sets per node:') logging.debug(self.partial_program_order.to_be_resolved) + def __parse_wait(self, input_cmd: str) -> int: + try: + node_id = int(input_cmd.split(":")[1].rstrip()) + # components = input_cmd.rstrip().split("|") + # command_id = int(components[0].split(":")[1]) + # exit_code = int(components[1].split(":")[1]) + # sandbox_dir = components[2].split(":")[1] + return node_id + except: + raise Exception(f'Parsing failure for line: {input_cmd}') + def handle_wait(self, input_cmd: str, connection): assert(input_cmd.startswith("Wait")) ## We have received this message by the JIT, which waits for a node_id to ## finish execution. - node_id = int(input_cmd.split(":")[1].rstrip()) + node_id = self.__parse_wait(input_cmd) logging.debug(f'Scheduler: Received wait for node_id: {node_id}') ## TODO: If node is in a loop, then start executing it now even though From 84dbdb2cb4a3aa97d566f8e72dbb19683afc2b29 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Wed, 3 May 2023 11:34:15 -0400 Subject: [PATCH 09/32] checkpoint --- deps/pash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/pash b/deps/pash index c7a8c24e..781e1752 160000 --- a/deps/pash +++ b/deps/pash @@ -1 +1 @@ -Subproject commit c7a8c24e88768f6a44aa4470c7fe6fbf2a68eb69 +Subproject commit 781e17525c005bff6950c99301ae2c04d8f341fb From 057fe54e7a057b265ea994237dddb2f17c92ff7f Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Wed, 3 May 2023 16:10:49 -0400 Subject: [PATCH 10/32] Correctly pass loop counters to scheduler from pash-jit --- deps/pash | 2 +- parallel-orch/scheduler_server.py | 15 +++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/deps/pash b/deps/pash index 781e1752..db97991a 160000 --- a/deps/pash +++ b/deps/pash @@ -1 +1 @@ -Subproject commit 781e17525c005bff6950c99301ae2c04d8f341fb +Subproject commit db97991a67e619b1cc83c4afca78cdf1422d76c1 diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index 8b0a3938..31116cc3 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -82,12 +82,11 @@ def handle_init(self, input_cmd: str): def __parse_wait(self, input_cmd: str) -> int: try: - node_id = int(input_cmd.split(":")[1].rstrip()) - # components = input_cmd.rstrip().split("|") - # command_id = int(components[0].split(":")[1]) - # exit_code = int(components[1].split(":")[1]) - # sandbox_dir = components[2].split(":")[1] - return node_id + node_id_component, loop_iter_counter_component = input_cmd.rstrip().split("|") + node_id = int(node_id_component.split(":")[1].rstrip()) + loop_counters_str = loop_iter_counter_component.split(":")[1].rstrip() + loop_counters = loop_counters_str.split("-") + return node_id, loop_counters except: raise Exception(f'Parsing failure for line: {input_cmd}') @@ -95,8 +94,8 @@ def handle_wait(self, input_cmd: str, connection): assert(input_cmd.startswith("Wait")) ## We have received this message by the JIT, which waits for a node_id to ## finish execution. - node_id = self.__parse_wait(input_cmd) - logging.debug(f'Scheduler: Received wait for node_id: {node_id}') + node_id, loop_counters = self.__parse_wait(input_cmd) + logging.debug(f'Scheduler: Received wait for node_id: {node_id} with loop counters: {loop_counters}') ## TODO: If node is in a loop, then start executing it now even though ## it is done executing. From bb9be4e28b978f00f3048b1c4a60ef42ab6c9ea8 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Thu, 4 May 2023 13:31:39 -0400 Subject: [PATCH 11/32] add some assertions to check that loop nodes cannot be executed --- parallel-orch/partial_program_order.py | 80 ++++++++++++++++++++++---- parallel-orch/scheduler_server.py | 6 +- 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index ef7d5f7b..658552c9 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -85,7 +85,10 @@ def __init__(self, nodes, edges): self.adjacency = edges self.init_inverse_adjacency() ## TODO: KK: Is it OK if we modify adjacency lists on the fly while processing the partial-order? + ## TODO: Remember to modify inverse_adjacency ## self.committed is an add-only set, we never remove + ## TODO: For loop modify committed, workset, frontier, stopped + ## TODO: Add assertions that committed etc do not contain loop nodes self.committed = set() ## Nodes that are in the frontier can only move to committed self.frontier = self.get_source_nodes() @@ -118,8 +121,16 @@ def get_source_nodes(self) -> list: sources.add(to_id) return list(sources) + def init_partial_order(self): + self.init_workset() + logging.debug(f'Initialized workset') + self.populate_to_be_resolved_dict([]) + logging.debug(f'To be resolved sets per node:') + logging.debug(self.to_be_resolved) + assert(self.valid()) + def init_workset(self): - self.workset = self.get_all_non_committed() + self.workset = self.get_all_non_committed_standard_nodes() ## Check if the partial order is done def is_completed(self) -> bool: @@ -141,15 +152,32 @@ def init_inverse_adjacency(self): self.inverse_adjacency[to_id].append(from_id) # ## TODO: (When there is time) Define a function that checks that the graph is valid + ## TODO: Call valid and add assertiosn for loops here. def valid(self): + self.log_partial_program_order_info() + valid1 = self.loop_nodes_valid() + ## TODO: Fix the checks below because they do not work currently ## TODO: Check that committed is prefix closed w.r.t partial order - self.all_frontier_nodes_after_committed_nodes() - self.frontier_and_committed_intersect() - return True + # self.all_frontier_nodes_after_committed_nodes() + # self.frontier_and_committed_intersect() + return valid1 + + ## Checks if loop nodes are all valid, i.e., that there are no loop nodes handled like normal ones, + ## e.g., in workset, committed, etc + def loop_nodes_valid(self): + forbidden_sets = self.get_committed() + \ + self.get_frontier() + \ + self.get_workset() + \ + list(self.stopped) + \ + list(self.commands_currently_executing.keys()) + loop_nodes_in_forbidden_sets = [node_id for node_id in forbidden_sets + if self.is_loop_node(node_id)] + return len(loop_nodes_in_forbidden_sets) == 0 # Check if all frontier nodes are after committed nodes def all_frontier_nodes_after_committed_nodes(self): - return max(self.committed) < min(self.frontier) + ## TODO: Make this check a proper predecessor check + return max(self.get_committed()) < min(self.frontier) # Checks if frontier and committed intersect def frontier_and_committed_intersect(self): @@ -161,13 +189,26 @@ def __len__(self): def get_node(self, node_id:int) -> Node: return self.nodes[node_id] - def get_all_non_committed(self) -> list: + def get_all_non_committed(self) -> "list[int]": return self.get_transitive_closure(self.frontier) + + def is_loop_node(self, node_id:int) -> bool: + return self.get_node(node_id).in_loop() + + ## Only keeps standard (non-loop) nodes + def filter_standard_nodes(self, node_ids: "list[int]") -> "list[int]": + return [node_id for node_id in node_ids + if not self.is_loop_node(node_id)] - def get_next(self, node_id:int) -> list: + ## Returns all non committed non-loop nodes + def get_all_non_committed_standard_nodes(self) -> "list[int]": + all_non_committed = self.get_all_non_committed() + return self.filter_standard_nodes(all_non_committed) + + def get_next(self, node_id:int) -> "list[int]": return self.adjacency[node_id] - def get_transitive_closure(self, target_node_ids:list) -> list: + def get_transitive_closure(self, target_node_ids:"list[int]") -> "list[int]": all_next_transitive = set(target_node_ids) next_work = target_node_ids.copy() while len(next_work) > 0: @@ -220,6 +261,8 @@ def get_node_id_from_cmd_no_redir(self, cmd_no_redir: str) -> int: def cmd_can_be_resolved(self, node_id: int) -> bool: # If the command we evaluate has no earlier command currently executing, it can be resolved this round if len(self.get_currently_executing()) > 0: + ## TODO: Modify this to be a proper predecessor so that it works with newly + ## added loop nodes (proper predecessor does not happen with comparing indexes) return node_id <= min(self.get_currently_executing()) else: return True @@ -308,7 +351,7 @@ def resolve_dependencies_continuous_and_move_frontier(self, new_node_id): self.step_forward(old_committed) # self.log_partial_program_order_info() - return self.committed - old_committed + return set(self.get_committed()) - old_committed def rerun_stopped(self): new_stopped = self.stopped.copy() @@ -358,6 +401,7 @@ def step_forward(self, old_committed): ## We don't know if they are done executing until then. def commit_frontier(self): # Second condition below may be unecessary + logging.debug(f'Frontier: {self.frontier}') for frontier_node in self.frontier: if frontier_node not in self.workset: self.save_commit_state_of_cmd(frontier_node) @@ -367,7 +411,7 @@ def move_frontier_forward(self): new_frontier = [] for node in self.frontier: if node not in self.workset: - to_add_in_frontier = self.get_next_non_speculated(node) + to_add_in_frontier = self.get_next_standard_non_speculated(node) new_frontier.extend(to_add_in_frontier) logging.trace(f"FrontierAdd|{','.join(str(node_id) for node_id in to_add_in_frontier)}") # If node is being executed again, we cannot progress further @@ -376,12 +420,23 @@ def move_frontier_forward(self): logging.trace(f"FrontierAdd|{node}") self.frontier = new_frontier + def get_next_standard_non_speculated(self, start: int) -> "list[int]": + next_non_speculated = self.get_next_non_speculated(start) + return self.filter_standard_nodes(next_non_speculated) + def get_next_non_speculated(self, start): traversal_workset = self.get_next(start) next_non_speculated = [] while len(traversal_workset) > 0: node_id = traversal_workset.pop() - if node_id not in self.get_currently_executing() and node_id not in self.get_committed() and node_id not in self.stopped and node_id not in self.waiting_to_be_resolved and node_id not in self.workset: + ## KK 2023-05-04: Why is this happening in a get_next_non_speculated_traversal? + ## TODO: Move this outside in some effectful method + if node_id not in self.get_currently_executing() \ + and node_id not in self.get_committed() \ + and node_id not in self.stopped \ + and node_id not in self.waiting_to_be_resolved \ + and node_id not in self.workset\ + and not self.is_loop_node(node_id): self.save_commit_state_of_cmd(node_id) self.committed.add(node_id) traversal_workset.extend(self.get_next(node_id)) @@ -425,8 +480,10 @@ def has_forward_dependency(self, first_id, second_id): ## TODO: Eventually, in the future, let's add here some form of limit def schedule_work(self, limit=0): + # self.log_partial_program_order_info() self.run_all_frontier_cmds() self.schedule_all_workset_non_frontier_cmds() + assert(self.valid()) def schedule_all_workset_non_frontier_cmds(self): non_frontier_ids = [node_id for node_id in self.get_workset() @@ -514,6 +571,7 @@ def command_execution_completed(self, node_id: int, riker_exit_code:int, sandbox logging.trace(f"Commit|"+",".join(str(node_id) for node_id in to_commit)) self.commit_cmd_workspaces(to_commit) # self.print_cmd_stderr(stderr) + assert(self.valid()) def print_cmd_stderr(self, stderr): # stdout.seek(0) diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index 9fd42cb8..f08022d4 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -67,11 +67,7 @@ def handle_init(self, input_cmd: str): partial_order_file = input_cmd.split(":")[1].rstrip() logging.debug(f'Scheduler: Received partial_order_file: {partial_order_file}') self.partial_program_order = parse_partial_program_order_from_file(partial_order_file) - self.partial_program_order.init_workset() - logging.debug(f'Parsed partial program order:') - self.partial_program_order.populate_to_be_resolved_dict([]) - logging.debug(f'To be resolved sets per node:') - logging.debug(self.partial_program_order.to_be_resolved) + self.partial_program_order.init_partial_order() def __parse_wait(self, input_cmd: str) -> int: try: From 52ad1d053e4b53462013526f0c4c552192b035c8 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Thu, 4 May 2023 15:45:14 -0400 Subject: [PATCH 12/32] Push a checkpoint before doing the big change of changing all NodeIds to their own class --- parallel-orch/partial_program_order.py | 100 ++++++++++++++++++++++++- parallel-orch/scheduler_server.py | 7 +- 2 files changed, 104 insertions(+), 3 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 658552c9..b871322f 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -22,6 +22,28 @@ def get_stdout_file(self): def __str__(self): return f'CompletedNodeInfo(ec:{self.get_exit_code()}, vf:{self.get_variable_file()}, stdout:{self.get_stdout_file()})' +class NodeId: + def __init__(self, id: str, loop_iters=None): + self.id = id + self.loop_iters = loop_iters + + def __hash__(self): + return hash((self.id, self.loop_iters)) + + def __eq__(self, other): + if not len(self.loop_iters) == len(other.loop_iters): + return False + for i in range(len(self.loop_iters)): + if not self.loop_iters[i] == other.loop_iters[i]: + return False + return self.id == other.id + + def __ne__(self, other): + # Not strictly necessary, but to avoid having both x==y and x!=y + # True at the same time + return not(self == other) + + class Node: def __init__(self, id, cmd, loop_context): self.cmd = cmd @@ -43,6 +65,9 @@ def get_cmd(self) -> str: def get_cmd_no_redir(self) -> str: return self.cmd_no_redir + def get_loop_context(self) -> "list[int]": + return self.loop_context + def in_loop(self) -> bool: return len(self.loop_context) > 0 @@ -121,6 +146,36 @@ def get_source_nodes(self) -> list: sources.add(to_id) return list(sources) + ## This returns all previous nodes of a sub partial order + def get_sub_po_prev_nodes(self, node_ids: "list[int]") -> "list[int]": + # assert(self.is_closed_sub_partial_order(node_ids)) + prev_nodes = set() + node_set = set(node_ids) + for node_id in node_ids: + prev_ids_set = set(self.get_prev(node_id)) + ## KK 2023-05-04 is it ever the case that some (but not all) prev nodes might be outside. I don't think so + prev_nodes = prev_nodes.union(prev_ids_set - node_set) + + ## KK 2024-05-03: I don't see how we can get multiple sources with the current structure + assert(len(prev_nodes) == 1) + return list(prev_nodes) + + ## TODO: Implement this correctly. I have thought of a naive algorithm that + ## does a BFS forward and backward for each node and if we first see a + ## node outside of the set and then one inside it means that the subset is not closed. + def is_closed_sub_partial_order(self, node_ids: "list[int]") -> bool: + # node_set = set(node_ids) + # visited_set = set() + # for node_id in node_ids: + # prev_ids_set = set(self.get_prev(node_id)) + # next_id_set = set(self.get_next(node_id)) + # ## If one of the previous or next nodes is not in the node set + # ## it means that the sub partial order is not closed. + # if not node_set.issuperset(prev_ids_set.union(next_id_set)): + # return False + + return True + def init_partial_order(self): self.init_workset() logging.debug(f'Initialized workset') @@ -189,6 +244,9 @@ def __len__(self): def get_node(self, node_id:int) -> Node: return self.nodes[node_id] + def get_node_loop_context(self, node_id: int) -> "list[int]": + return self.get_node(node_id).get_loop_context() + def get_all_non_committed(self) -> "list[int]": return self.get_transitive_closure(self.frontier) @@ -200,6 +258,11 @@ def filter_standard_nodes(self, node_ids: "list[int]") -> "list[int]": return [node_id for node_id in node_ids if not self.is_loop_node(node_id)] + ## This creates a new node_id and then creates a mapping from the node and iteration id to this node id + def create_standard_id_from_loop_node(self, node_id: int, loop_id: int) -> int: + pass + + ## Returns all non committed non-loop nodes def get_all_non_committed_standard_nodes(self) -> "list[int]": all_non_committed = self.get_all_non_committed() @@ -207,7 +270,10 @@ def get_all_non_committed_standard_nodes(self) -> "list[int]": def get_next(self, node_id:int) -> "list[int]": return self.adjacency[node_id] - + + def get_prev(self, node_id:int) -> "list[int]": + return self.inverse_adjacency[node_id] + def get_transitive_closure(self, target_node_ids:"list[int]") -> "list[int]": all_next_transitive = set(target_node_ids) next_work = target_node_ids.copy() @@ -365,6 +431,15 @@ def rerun_stopped(self): self.to_be_resolved[cmd_id] = [] self.stopped = new_stopped + def find_loop_sub_partial_order(self, loop_id: int) -> "list[int]": + loop_node_ids = [] + for node_id in self.nodes: + loop_context = self.get_node_loop_context(node_id) + if loop_id in loop_context: + loop_node_ids.append(node_id) + ## TODO: Assert that this is closed w.r.t. partial order + return loop_node_ids + ## KK 2023-05-02: We should not be able to step/execute/speculate loop nodes, instead ## the only action we should be able to do to them is to unroll them, ## by creating iterations before them in the partial order. @@ -374,6 +449,7 @@ def rerun_stopped(self): ## Note: We have to be careful when unrolling loops to unroll a complete iteration to start with ## (to not have to deal with partial order relations between commands of different iterations). ## + ## ## Concrete pseudocode: ## def unroll(self, loop_id): ## ## Finds all of the nodes in the same loop in the partial order @@ -388,6 +464,28 @@ def rerun_stopped(self): ## ## We need to determine when to call unroll. For now we can just do it if the frontier is empty ## (which means that the next node of the frontier is a loop node). + def unroll_loop(self, loop_id: int): + loop_node_ids = self.find_loop_sub_partial_order(loop_id) + logging.debug(f'Node ids for loop: {loop_id} are: {loop_node_ids}') + ## Get the previous nodes of sub_po + previous_ids = self.get_sub_po_prev_nodes(loop_node_ids) + assert(len(previous_ids) == 1) + previous_id = previous_ids[0] + logging.debug(f'Previous node id for loop: {loop_id} is {previous_id}') + new_loop_node_ids = [self.create_standard_id_from_loop_node(node_id, loop_id) + for node_id in loop_node_ids] + + + def unroll_loop_node(self, node_id: int): + assert(self.is_loop_node(node_id)) + loop_context = self.get_node_loop_context(node_id) + ## TODO: First determine which exactly loop do we need to unroll + ## I am not sure if it is correct to just do the last one + ## I think it might be the difference between this and the previous node + ## + ## TODO: I actually think we have to unroll all loops + self.unroll_loop(loop_context[0]) + def step_forward(self, old_committed): logging.debug(" > Committing frontier") self.commit_frontier() diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index f08022d4..edd7b376 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -86,8 +86,11 @@ def handle_wait(self, input_cmd: str, connection): node_id, loop_counters = self.__parse_wait(input_cmd) logging.debug(f'Scheduler: Received wait for node_id: {node_id} with loop counters: {loop_counters}') - ## TODO: If node is in a loop, then start executing it now even though - ## it is done executing. + ## If node is in a loop, then start executing it now. + if self.partial_program_order.is_loop_node(node_id): + ## TODO: This unrolling can also happen and be moved to speculation. + ## For now we are being conservative and that is why it only happens here + self.partial_program_order.unroll_loop_node(node_id) ## If the node_id is already committed, just return its exit code if node_id in self.partial_program_order.get_committed(): From cf97e134a157ff6fbeae9e9077b8861b9719d8b6 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Thu, 4 May 2023 16:12:15 -0400 Subject: [PATCH 13/32] Make NodeId its own class --- parallel-orch/partial_program_order.py | 56 ++++++++++++++++++++------ parallel-orch/scheduler_server.py | 8 ++-- 2 files changed, 47 insertions(+), 17 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index b871322f..7007956b 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -23,12 +23,21 @@ def __str__(self): return f'CompletedNodeInfo(ec:{self.get_exit_code()}, vf:{self.get_variable_file()}, stdout:{self.get_stdout_file()})' class NodeId: - def __init__(self, id: str, loop_iters=None): + def __init__(self, id: int, loop_iters=None): self.id = id - self.loop_iters = loop_iters + if loop_iters is None: + self.loop_iters = [] + else: + self.loop_iters = loop_iters + def __repr__(self): + output = str(self.id) + if len(self.loop_iters) > 0: + output += f':{"-".join([str(it) for it in self.loop_iters])}' + return output + def __hash__(self): - return hash((self.id, self.loop_iters)) + return hash(str(self)) def __eq__(self, other): if not len(self.loop_iters) == len(other.loop_iters): @@ -42,6 +51,19 @@ def __ne__(self, other): # Not strictly necessary, but to avoid having both x==y and x!=y # True at the same time return not(self == other) + + ## TODO: Maybe we need to make these better + def __lt__(self, obj): + return (str(self) < str(obj)) + + def __gt__(self, obj): + return (str(self) > str(obj)) + + # def __le__(self, obj): + # return ((self.b) <= (obj.b)) + + # def __ge__(self, obj): + # return ((self.b) >= (obj.b)) class Node: @@ -232,7 +254,8 @@ def loop_nodes_valid(self): # Check if all frontier nodes are after committed nodes def all_frontier_nodes_after_committed_nodes(self): ## TODO: Make this check a proper predecessor check - return max(self.get_committed()) < min(self.frontier) + # return max(self.get_committed()) < min(self.frontier) + return False # Checks if frontier and committed intersect def frontier_and_committed_intersect(self): @@ -264,7 +287,7 @@ def create_standard_id_from_loop_node(self, node_id: int, loop_id: int) -> int: ## Returns all non committed non-loop nodes - def get_all_non_committed_standard_nodes(self) -> "list[int]": + def get_all_non_committed_standard_nodes(self) -> "list[NodeId]": all_non_committed = self.get_all_non_committed() return self.filter_standard_nodes(all_non_committed) @@ -274,7 +297,7 @@ def get_next(self, node_id:int) -> "list[int]": def get_prev(self, node_id:int) -> "list[int]": return self.inverse_adjacency[node_id] - def get_transitive_closure(self, target_node_ids:"list[int]") -> "list[int]": + def get_transitive_closure(self, target_node_ids:"list[NodeId]") -> "list[NodeId]": all_next_transitive = set(target_node_ids) next_work = target_node_ids.copy() while len(next_work) > 0: @@ -323,13 +346,20 @@ def get_node_id_from_cmd_no_redir(self, cmd_no_redir: str) -> int: # Check if the specific command can be resolved. - # TODO: this does not truly follow partial program order, we should implement it correctly + # KK 2023-05-04 I am not even sure what this function does and why is it useful. def cmd_can_be_resolved(self, node_id: int) -> bool: # If the command we evaluate has no earlier command currently executing, it can be resolved this round + ## KK 2023-05-04 This does not seem correct. In the future (where we don't speculate everything at once) + ## there might be a case where nothing is executing but a command can still not be resolved. if len(self.get_currently_executing()) > 0: - ## TODO: Modify this to be a proper predecessor so that it works with newly - ## added loop nodes (proper predecessor does not happen with comparing indexes) - return node_id <= min(self.get_currently_executing()) + ## if the node is in the transitive closure of any currently executing commands, + ## then we can't resolve it. + total_transitive_closure = set() + for other in self.get_currently_executing(): + other_tc = set(self.get_transitive_closure([other])) + total_transitive_closure = total_transitive_closure.union(other_tc) + ## If node_id can be reached from the other commands it can't be resolved + return not node_id in total_transitive_closure else: return True @@ -822,12 +852,12 @@ def parse_partial_program_order_from_file(file_path: str) -> PartialProgramOrder file_path = f'{cmds_directory}/{i}' cmd = parse_cmd_from_file(file_path) loop_ctx = loop_contexts[i] - nodes[i] = Node(i, cmd, loop_ctx) + nodes[NodeId(i)] = Node(NodeId(i), cmd, loop_ctx) - edges = {i : [] for i in range(number_of_nodes)} + edges = {NodeId(i) : [] for i in range(number_of_nodes)} for edge_line in edge_lines: from_id, to_id = parse_edge_line(edge_line) - edges[from_id].append(to_id) + edges[NodeId(from_id)].append(NodeId(to_id)) logging.trace(f"Nodes|{','.join([str(node) for node in nodes])}") return PartialProgramOrder(nodes, edges) diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index edd7b376..da55f9aa 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -4,7 +4,7 @@ from util import * import config import sys -from partial_program_order import parse_partial_program_order_from_file +from partial_program_order import parse_partial_program_order_from_file, NodeId ## ## A scheduler server @@ -69,10 +69,10 @@ def handle_init(self, input_cmd: str): self.partial_program_order = parse_partial_program_order_from_file(partial_order_file) self.partial_program_order.init_partial_order() - def __parse_wait(self, input_cmd: str) -> int: + def __parse_wait(self, input_cmd: str): try: node_id_component, loop_iter_counter_component = input_cmd.rstrip().split("|") - node_id = int(node_id_component.split(":")[1].rstrip()) + node_id = NodeId(int(node_id_component.split(":")[1].rstrip())) loop_counters_str = loop_iter_counter_component.split(":")[1].rstrip() loop_counters = loop_counters_str.split("-") return node_id, loop_counters @@ -107,7 +107,7 @@ def handle_wait(self, input_cmd: str, connection): def __parse_command_exec_complete(self, input_cmd: str) -> "tuple[int, int]": try: components = input_cmd.rstrip().split("|") - command_id = int(components[0].split(":")[1]) + command_id = NodeId(int(components[0].split(":")[1])) exit_code = int(components[1].split(":")[1]) sandbox_dir = components[2].split(":")[1] return command_id, exit_code, sandbox_dir From 9ecd8d52274207d7a2d876fc7e1843fc91c81df5 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Thu, 4 May 2023 17:25:18 -0400 Subject: [PATCH 14/32] Start unrolling the loop --- parallel-orch/partial_program_order.py | 198 +++++++++++++++++++++---- 1 file changed, 166 insertions(+), 32 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 7007956b..8074f24c 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -1,3 +1,4 @@ +import copy import logging import os import executor @@ -30,6 +31,9 @@ def __init__(self, id: int, loop_iters=None): else: self.loop_iters = loop_iters + def has_iters(self): + return len(self.loop_iters) > 0 + def __repr__(self): output = str(self.id) if len(self.loop_iters) > 0: @@ -72,6 +76,9 @@ def __init__(self, id, cmd, loop_context): self.id = id self.cmd_no_redir = trace.remove_command_redir(self.cmd) self.loop_context = loop_context + ## Keep track of how many iterations of this loop node we have unrolled + if len(loop_context) > 0: + self.current_iter = 0 def __str__(self): # return f"ID: {self.id}\nCMD: {self.cmd}\nR: {self.read_set}\nW: {self.write_set}" @@ -93,6 +100,11 @@ def get_loop_context(self) -> "list[int]": def in_loop(self) -> bool: return len(self.loop_context) > 0 + def get_next_iter(self) -> int: + assert(self.in_loop()) + self.current_iter += 1 + return self.current_iter + ## Note: This information is valid only after a node is committed. ## It might be set even before that, but it should only be retrieved when ## a node is committed. @@ -169,13 +181,43 @@ def get_source_nodes(self) -> list: return list(sources) ## This returns all previous nodes of a sub partial order - def get_sub_po_prev_nodes(self, node_ids: "list[int]") -> "list[int]": + def get_sub_po_source_nodes(self, node_ids: "list[NodeId]") -> "list[NodeId]": # assert(self.is_closed_sub_partial_order(node_ids)) - prev_nodes = set() + source_nodes = list() node_set = set(node_ids) for node_id in node_ids: prev_ids_set = set(self.get_prev(node_id)) ## KK 2023-05-04 is it ever the case that some (but not all) prev nodes might be outside. I don't think so + if len(prev_ids_set) == 0 or \ + not prev_ids_set.issubset(node_set): + source_nodes.append(node_id) + + ## KK 2024-05-03: I don't see how we can get multiple sources with the current structure + assert(len(source_nodes) == 1) + return source_nodes + + def get_sub_po_sink_nodes(self, node_ids: "list[NodeId]") -> "list[NodeId]": + # assert(self.is_closed_sub_partial_order(node_ids)) + sink_nodes = list() + node_set = set(node_ids) + for node_id in node_ids: + next_ids_set = set(self.get_next(node_id)) + ## KK 2023-05-04 is it ever the case that some (but not all) prev nodes might be outside. I don't think so + if len(next_ids_set) == 0 or \ + not next_ids_set.issubset(node_set): + sink_nodes.append(node_id) + + ## KK 2024-05-03: I don't see how we can get multiple sink with the current structure + assert(len(sink_nodes) == 1) + return sink_nodes + + ## This returns all previous nodes of a sub partial order + def get_sub_po_prev_nodes(self, node_ids: "list[NodeId]") -> "list[NodeId]": + # assert(self.is_closed_sub_partial_order(node_ids)) + prev_nodes = set() + node_set = set(node_ids) + for node_id in node_ids: + prev_ids_set = set(self.get_prev(node_id)) prev_nodes = prev_nodes.union(prev_ids_set - node_set) ## KK 2024-05-03: I don't see how we can get multiple sources with the current structure @@ -185,7 +227,7 @@ def get_sub_po_prev_nodes(self, node_ids: "list[int]") -> "list[int]": ## TODO: Implement this correctly. I have thought of a naive algorithm that ## does a BFS forward and backward for each node and if we first see a ## node outside of the set and then one inside it means that the subset is not closed. - def is_closed_sub_partial_order(self, node_ids: "list[int]") -> bool: + def is_closed_sub_partial_order(self, node_ids: "list[NodeId]") -> bool: # node_set = set(node_ids) # visited_set = set() # for node_id in node_ids: @@ -264,26 +306,30 @@ def frontier_and_committed_intersect(self): def __len__(self): return len(self.nodes) - def get_node(self, node_id:int) -> Node: + def get_node(self, node_id:NodeId) -> Node: return self.nodes[node_id] - def get_node_loop_context(self, node_id: int) -> "list[int]": + def get_node_loop_context(self, node_id: NodeId) -> "list[int]": return self.get_node(node_id).get_loop_context() - def get_all_non_committed(self) -> "list[int]": + def get_all_non_committed(self) -> "list[NodeId]": return self.get_transitive_closure(self.frontier) - def is_loop_node(self, node_id:int) -> bool: + def is_loop_node(self, node_id:NodeId) -> bool: return self.get_node(node_id).in_loop() ## Only keeps standard (non-loop) nodes - def filter_standard_nodes(self, node_ids: "list[int]") -> "list[int]": + def filter_standard_nodes(self, node_ids: "list[NodeId]") -> "list[NodeId]": return [node_id for node_id in node_ids if not self.is_loop_node(node_id)] ## This creates a new node_id and then creates a mapping from the node and iteration id to this node id - def create_standard_id_from_loop_node(self, node_id: int, loop_id: int) -> int: - pass + ## TODO: Currently doesn't work with nested loops + def create_standard_id_from_loop_node(self, node_id: NodeId, loop_id: int) -> NodeId: + node = self.get_node(node_id) + new_iter = node.get_next_iter() + assert(not node_id.has_iters()) + return NodeId(node_id.id, [new_iter]) ## Returns all non committed non-loop nodes @@ -291,11 +337,26 @@ def get_all_non_committed_standard_nodes(self) -> "list[NodeId]": all_non_committed = self.get_all_non_committed() return self.filter_standard_nodes(all_non_committed) - def get_next(self, node_id:int) -> "list[int]": - return self.adjacency[node_id] + def get_next(self, node_id:NodeId) -> "list[NodeId]": + return self.adjacency[node_id][:] - def get_prev(self, node_id:int) -> "list[int]": - return self.inverse_adjacency[node_id] + def get_prev(self, node_id:NodeId) -> "list[NodeId]": + return self.inverse_adjacency[node_id][:] + + def reroute_edge_from(self, old_from: NodeId, new_from: NodeId, to: NodeId): + self.adjacency[old_from].remove(to) + ## KK 2023-05-04 Is it a problem that we append? Maybe we should make that a set + self.adjacency[new_from].append(to) + self.inverse_adjacency[to] = PartialProgramOrder.map_using_mapping(self.inverse_adjacency[to], + {old_from: new_from}) + + def reroute_edge_to(self, from_id: NodeId, old_to: NodeId, new_to: NodeId): + self.inverse_adjacency[old_to].remove(from_id) + ## KK 2023-05-04 Is it a problem that we append? Maybe we should make that a set + self.inverse_adjacency[new_to].append(from_id) + self.adjacency[from_id] = PartialProgramOrder.map_using_mapping(self.adjacency[from_id], + {old_to: new_to}) + def get_transitive_closure(self, target_node_ids:"list[NodeId]") -> "list[NodeId]": all_next_transitive = set(target_node_ids) @@ -319,7 +380,7 @@ def get_transitive_closure_if_can_be_resolved(self, can_be_resolved: list, targe next_work.extend(new_next) return list(all_next_transitive) - def is_frontier(self, node_id: int) -> bool: + def is_frontier(self, node_id: NodeId) -> bool: return node_id in self.frontier def update_rw_set(self, node_id, rw_set): @@ -331,18 +392,18 @@ def get_rw_set(self, node_id) -> RWSet: def get_rw_sets(self) -> dict: return self.rw_sets - def add_to_read_set(self, node_id: int, item: str): + def add_to_read_set(self, node_id: NodeId, item: str): self.rw_sets[node_id].add_to_read_set(item) - def add_to_write_set(self, node_id: int, item: str): + def add_to_write_set(self, node_id: NodeId, item: str): self.rw_sets[node_id].add_to_write_set(item) # TODO: HACK delete this method ASAP - def get_node_id_from_cmd_no_redir(self, cmd_no_redir: str) -> int: - for node_id, node in self.nodes.items(): - if node.get_cmd_no_redir() == cmd_no_redir: - return node_id - assert(False) + # def get_node_id_from_cmd_no_redir(self, cmd_no_redir: str) -> NodeId: + # for node_id, node in self.nodes.items(): + # if node.get_cmd_no_redir() == cmd_no_redir: + # return node_id + # assert(False) # Check if the specific command can be resolved. @@ -461,7 +522,7 @@ def rerun_stopped(self): self.to_be_resolved[cmd_id] = [] self.stopped = new_stopped - def find_loop_sub_partial_order(self, loop_id: int) -> "list[int]": + def find_loop_sub_partial_order(self, loop_id: int) -> "list[NodeId]": loop_node_ids = [] for node_id in self.nodes: loop_context = self.get_node_loop_context(node_id) @@ -502,11 +563,83 @@ def unroll_loop(self, loop_id: int): assert(len(previous_ids) == 1) previous_id = previous_ids[0] logging.debug(f'Previous node id for loop: {loop_id} is {previous_id}') - new_loop_node_ids = [self.create_standard_id_from_loop_node(node_id, loop_id) - for node_id in loop_node_ids] + + ## Create the new nodes and remap adjacencies accordingly + node_mappings = {} + for node_id in loop_node_ids: + node = self.get_node(node_id) + new_loop_node_id = self.create_standard_id_from_loop_node(node_id, loop_id) + node_mappings[node_id] = new_loop_node_id + ## Create the new node + self.nodes[new_loop_node_id] = Node(new_loop_node_id, node.cmd, []) + logging.debug(f'New loop ids: {node_mappings}') + + ## Create the new adjacencies, by mapping adjacencies in the node set to the new node ids + ## and leaving outside adjacencies as they are + for _, new_node_id in node_mappings.items(): + self.adjacency[new_node_id] = [] + + for node_id, new_node_id in node_mappings.items(): + old_prev_ids = self.get_prev(node_id) + ## Modify all id to be in the new set except for the + new_prev_ids = PartialProgramOrder.map_using_mapping(old_prev_ids, node_mappings) + self.inverse_adjacency[new_node_id] = new_prev_ids + for new_prev_id in new_prev_ids: + self.adjacency[new_prev_id].append(new_node_id) + + print(self.nodes) + print(self.adjacency) + print(self.inverse_adjacency) + + ## TODO: The rest of the code here makes assumptions about the shape of the partial order + + ## Modify the previous node of the loop nodes + new_nodes_sinks = self.get_sub_po_sink_nodes(list(node_mappings.values())) + assert(len(new_nodes_sinks) == 1) + new_nodes_sink = new_nodes_sinks[0] + + old_nodes_sources = self.get_sub_po_source_nodes(list(node_mappings.keys())) + assert(len(old_nodes_sources) == 1) + old_nodes_source = old_nodes_sources[0] + + old_next_node_ids = self.get_next(new_nodes_sink) + assert(len(old_next_node_ids) <= 1) + + self.reroute_edge_from(old_from=previous_id, + new_from=new_nodes_sink, + to=old_nodes_source) + + print(self.nodes) + print(self.adjacency) + print(self.inverse_adjacency) + + ## Modify the next node of the new po + if len(old_next_node_ids) > 0: + assert(len(old_next_node_ids) == 1) + old_next_node_id = old_next_node_ids[0] + self.reroute_edge_to(old_to=old_next_node_id, + new_to=new_nodes_sink, + from_id=new_nodes_sink) + + + print(self.nodes) + print(self.adjacency) + print(self.inverse_adjacency) + + ## Static method that just maps using a node mapping dictionary or leaves them as + ## they are if not + def map_using_mapping(node_ids: "list[NodeId]", mapping) -> "list[NodeId]": + new_node_ids = [] + for node_id in node_ids: + if node_id in mapping: + new_id = copy.deepcopy(mapping[node_id]) + else: + new_id = copy.deepcopy(node_id) + new_node_ids.append(new_id) + return new_node_ids - def unroll_loop_node(self, node_id: int): + def unroll_loop_node(self, node_id: NodeId): assert(self.is_loop_node(node_id)) loop_context = self.get_node_loop_context(node_id) ## TODO: First determine which exactly loop do we need to unroll @@ -548,7 +681,7 @@ def move_frontier_forward(self): logging.trace(f"FrontierAdd|{node}") self.frontier = new_frontier - def get_next_standard_non_speculated(self, start: int) -> "list[int]": + def get_next_standard_non_speculated(self, start: NodeId) -> "list[NodeId]": next_non_speculated = self.get_next_non_speculated(start) return self.filter_standard_nodes(next_non_speculated) @@ -639,7 +772,7 @@ def run_all_frontier_cmds(self): self.run_cmd_non_blocking(cmd_id) ## Run a command and add it to the dictionary of executing ones - def run_cmd_non_blocking(self, node_id: int): + def run_cmd_non_blocking(self, node_id: NodeId): ## A command should only be run if it's in the frontier, otherwise it should be spec run assert(self.is_frontier(node_id)) node = self.get_node(node_id) @@ -651,7 +784,7 @@ def run_cmd_non_blocking(self, node_id: int): self.commands_currently_executing[node_id] = (proc, trace_file, stdout, stderr, variable_file) ## Run a command and add it to the dictionary of executing ones - def speculate_cmd_non_blocking(self, node_id: int): + def speculate_cmd_non_blocking(self, node_id: NodeId): node = self.get_node(node_id) cmd = node.get_cmd() logging.debug(f'Speculating command: {node_id} {self.get_node(node_id)}') @@ -660,7 +793,7 @@ def speculate_cmd_non_blocking(self, node_id: int): logging.debug(f'Read trace from: {trace_file}') self.commands_currently_executing[node_id] = (proc, trace_file, stdout, stderr, variable_file) - def command_execution_completed(self, node_id: int, riker_exit_code:int, sandbox_dir: str): + def command_execution_completed(self, node_id: NodeId, riker_exit_code:int, sandbox_dir: str): logging.debug(f" --- Node {node_id}, just finished execution ---") self.sandbox_dirs[node_id] = sandbox_dir ## TODO: Store variable file somewhere so that we can return when wait @@ -760,13 +893,13 @@ def populate_to_be_resolved_dict(self, old_committed): # relevant_committed = old_committed relevant_committed = self.committed if node_id not in relevant_committed: - to_add = self.inverse_adjacency[node_id].copy() + to_add = self.get_prev(node_id).copy() traversal = to_add.copy() to_be_resolved_nodes_ids = to_add.copy() while len(traversal) > 0: current_node_id = traversal.pop(0) if current_node_id not in relevant_committed: - to_add = self.inverse_adjacency[current_node_id] + to_add = self.get_prev(current_node_id) to_be_resolved_nodes_ids.extend(to_add) traversal.extend(to_add) self.to_be_resolved[node_id] = to_be_resolved_nodes_ids.copy() @@ -860,4 +993,5 @@ def parse_partial_program_order_from_file(file_path: str) -> PartialProgramOrder edges[NodeId(from_id)].append(NodeId(to_id)) logging.trace(f"Nodes|{','.join([str(node) for node in nodes])}") + logging.trace(f"Edges: {edges}") return PartialProgramOrder(nodes, edges) From 8a0d787441849f7a733ba3a9ab68c7fb63891d04 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Thu, 4 May 2023 17:54:20 -0400 Subject: [PATCH 15/32] rough first draft of loops seems to be working --- parallel-orch/partial_program_order.py | 23 +++++++++++++++++++++-- parallel-orch/scheduler_server.py | 23 ++++++++++++++++------- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 8074f24c..e1584bb7 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -37,7 +37,7 @@ def has_iters(self): def __repr__(self): output = str(self.id) if len(self.loop_iters) > 0: - output += f':{"-".join([str(it) for it in self.loop_iters])}' + output += f'+{"-".join([str(it) for it in self.loop_iters])}' return output def __hash__(self): @@ -69,6 +69,13 @@ def __gt__(self, obj): # def __ge__(self, obj): # return ((self.b) >= (obj.b)) +def parse_node_id(node_id_str: str) -> NodeId: + if "+" in node_id_str: + node_id_int, iters_str = node_id_str.split("+") + iters = [int(it) for it in iters_str.split("-")] + return NodeId(int(node_id_int), iters) + else: + return NodeId(int(node_id_str)) class Node: def __init__(self, id, cmd, loop_context): @@ -626,6 +633,9 @@ def unroll_loop(self, loop_id: int): print(self.adjacency) print(self.inverse_adjacency) + ## Return the new first node + return node_mappings[old_nodes_source] + ## Static method that just maps using a node mapping dictionary or leaves them as ## they are if not def map_using_mapping(node_ids: "list[NodeId]", mapping) -> "list[NodeId]": @@ -647,7 +657,13 @@ def unroll_loop_node(self, node_id: NodeId): ## I think it might be the difference between this and the previous node ## ## TODO: I actually think we have to unroll all loops - self.unroll_loop(loop_context[0]) + new_first_node_id = self.unroll_loop(loop_context[0]) + + ## TODO: This needs to change when we modify unrolling to happen speculatively too + ## TODO: This needs to properly add the node to frontier and to resolve dictionary + self.step_forward(copy.deepcopy(self.committed)) + self.frontier.append(new_first_node_id) + def step_forward(self, old_committed): logging.debug(" > Committing frontier") @@ -742,6 +758,9 @@ def has_forward_dependency(self, first_id, second_id): ## TODO: Eventually, in the future, let's add here some form of limit def schedule_work(self, limit=0): # self.log_partial_program_order_info() + logging.debug("Scheduling work...") + ## KK 2023-05-04 Is it a problem if we do that here? + # self.step_forward(copy.deepcopy(self.committed)) self.run_all_frontier_cmds() self.schedule_all_workset_non_frontier_cmds() assert(self.valid()) diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index da55f9aa..5cd34d0b 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -1,10 +1,11 @@ import argparse +import copy import logging import signal from util import * import config import sys -from partial_program_order import parse_partial_program_order_from_file, NodeId +from partial_program_order import parse_partial_program_order_from_file, NodeId, parse_node_id ## ## A scheduler server @@ -74,7 +75,10 @@ def __parse_wait(self, input_cmd: str): node_id_component, loop_iter_counter_component = input_cmd.rstrip().split("|") node_id = NodeId(int(node_id_component.split(":")[1].rstrip())) loop_counters_str = loop_iter_counter_component.split(":")[1].rstrip() - loop_counters = loop_counters_str.split("-") + if loop_counters_str == "None": + loop_counters = [] + else: + loop_counters = [int(cnt) for cnt in loop_counters_str.split("-")] return node_id, loop_counters except: raise Exception(f'Parsing failure for line: {input_cmd}') @@ -83,14 +87,19 @@ def handle_wait(self, input_cmd: str, connection): assert(input_cmd.startswith("Wait")) ## We have received this message by the JIT, which waits for a node_id to ## finish execution. - node_id, loop_counters = self.__parse_wait(input_cmd) - logging.debug(f'Scheduler: Received wait for node_id: {node_id} with loop counters: {loop_counters}') + raw_node_id, loop_counters = self.__parse_wait(input_cmd) + logging.debug(f'Scheduler: Received wait for node_id: {raw_node_id} with loop counters: {loop_counters}') ## If node is in a loop, then start executing it now. - if self.partial_program_order.is_loop_node(node_id): + if self.partial_program_order.is_loop_node(raw_node_id): ## TODO: This unrolling can also happen and be moved to speculation. ## For now we are being conservative and that is why it only happens here - self.partial_program_order.unroll_loop_node(node_id) + self.partial_program_order.unroll_loop_node(raw_node_id) + + if self.partial_program_order.is_loop_node(raw_node_id): + node_id = NodeId(raw_node_id.id, loop_counters) + else: + node_id = raw_node_id ## If the node_id is already committed, just return its exit code if node_id in self.partial_program_order.get_committed(): @@ -107,7 +116,7 @@ def handle_wait(self, input_cmd: str, connection): def __parse_command_exec_complete(self, input_cmd: str) -> "tuple[int, int]": try: components = input_cmd.rstrip().split("|") - command_id = NodeId(int(components[0].split(":")[1])) + command_id = parse_node_id(components[0].split(":")[1]) exit_code = int(components[1].split(":")[1]) sandbox_dir = components[2].split(":")[1] return command_id, exit_code, sandbox_dir From 0001a66af6ec5302a0667e9090c6b4e293cc1ad0 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Thu, 4 May 2023 18:02:28 -0400 Subject: [PATCH 16/32] Solve a bug where the rkr cache was there and so there was no reexecution of the same consecutive command --- parallel-orch/template_script_to_execute.sh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/parallel-orch/template_script_to_execute.sh b/parallel-orch/template_script_to_execute.sh index 58ff87d2..9e97a141 100755 --- a/parallel-orch/template_script_to_execute.sh +++ b/parallel-orch/template_script_to_execute.sh @@ -2,6 +2,11 @@ ## TODO: Pass frontier flag here instead of separate scripts +## Clean up the riker directory +## KK 2023-05-04 should this be done somewhere else? Could this interfere with overlay fs? +## TODO: Can we just ask riker to use a different cache (or put the cache to /dev/null) +## since we never really want it to take the cache into account +rm -rf ./.rkr ## Save the script to execute in the sandboxdir echo $CMD_STRING > ./Rikerfile From 7f16341a8b72f9bcc2cb6536cb7c943477844c0a Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Thu, 4 May 2023 18:03:01 -0400 Subject: [PATCH 17/32] first draft of loop works, next fix multiple commands in the same loop --- loop.sh | 13 +++++++++---- parallel-orch/partial_program_order.py | 13 ------------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/loop.sh b/loop.sh index 7fcfdf0b..c9be532b 100644 --- a/loop.sh +++ b/loop.sh @@ -1,5 +1,10 @@ echo hi -for i in 1 2 3; do - ## Until we figure out variables the only way to determine that this run twice is by measuring the executed time - sleep 2 -done \ No newline at end of file +for i in 1 2 3 4 5; do + ## Can't do nested loops yet + # for j in 1 2; do + ## Until we figure out variables the only way to determine that this run twice is by measuring the executed time + sleep 1 + ## TODO: Manage to do multiple commands in a single loop + echo hi + # done +done diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index e1584bb7..3d2e5a7b 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -594,10 +594,6 @@ def unroll_loop(self, loop_id: int): for new_prev_id in new_prev_ids: self.adjacency[new_prev_id].append(new_node_id) - print(self.nodes) - print(self.adjacency) - print(self.inverse_adjacency) - ## TODO: The rest of the code here makes assumptions about the shape of the partial order ## Modify the previous node of the loop nodes @@ -616,10 +612,6 @@ def unroll_loop(self, loop_id: int): new_from=new_nodes_sink, to=old_nodes_source) - print(self.nodes) - print(self.adjacency) - print(self.inverse_adjacency) - ## Modify the next node of the new po if len(old_next_node_ids) > 0: assert(len(old_next_node_ids) == 1) @@ -627,11 +619,6 @@ def unroll_loop(self, loop_id: int): self.reroute_edge_to(old_to=old_next_node_id, new_to=new_nodes_sink, from_id=new_nodes_sink) - - - print(self.nodes) - print(self.adjacency) - print(self.inverse_adjacency) ## Return the new first node return node_mappings[old_nodes_source] From 66914545bf7a3ee3e79dfb79712f0a092604cd9e Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Mon, 8 May 2023 16:11:07 -0400 Subject: [PATCH 18/32] refactor dependency resolution --- parallel-orch/partial_program_order.py | 46 ++++++++++++++------------ 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 3d2e5a7b..7d473940 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -473,27 +473,8 @@ def resolve_dependencies(self, cmds_to_resolve): ## Resolve all the forward dependencies and update the workset ## Forward dependency is when a command's output is the same ## as the input of a following command - def resolve_dependencies_continuous_and_move_frontier(self, new_node_id): + def resolve_dependencies_continuous_and_move_frontier(self, cmds_to_resolve): self.log_partial_program_order_info() - # We want to check every single command that has already finished executing but - # not yet able to be resolved - logging.debug(f"Finding sets of commands that can be resolved after {new_node_id} finished executing") - if new_node_id not in self.stopped: - cmds_to_resolve = self.find_cmds_to_resolve(sorted(list(self.waiting_to_be_resolved.union({new_node_id})))) - else: - logging.debug(f"Node {new_node_id} exited with an error. Not resolving dependencies") - if new_node_id in self.workset: - self.workset.remove(new_node_id) - logging.trace(f"WorksetRemove|{new_node_id}") - cmds_to_resolve = [] - logging.debug(f"Commands to check for dependencies this round are: {sorted(cmds_to_resolve)}") - logging.debug(f"Commands that cannot be resolved this round are: {sorted(self.waiting_to_be_resolved)}") - - # If no commands can be resolved this round, - # do nothing and wait until a new command finishes executing - if len(cmds_to_resolve) == 0: - logging.debug("No resolvable nodes were found in this round, nothing will change...") - return [] logging.debug(f"Commands to be checked for dependencies: {sorted(cmds_to_resolve)}") logging.debug(" --- Starting dependency resolution --- ") @@ -563,6 +544,7 @@ def find_loop_sub_partial_order(self, loop_id: int) -> "list[NodeId]": ## We need to determine when to call unroll. For now we can just do it if the frontier is empty ## (which means that the next node of the frontier is a loop node). def unroll_loop(self, loop_id: int): + logging.info(f'Unrolling loop with id: {loop_id}') loop_node_ids = self.find_loop_sub_partial_order(loop_id) logging.debug(f'Node ids for loop: {loop_id} are: {loop_node_ids}') ## Get the previous nodes of sub_po @@ -827,10 +809,31 @@ def command_execution_completed(self, node_id: NodeId, riker_exit_code:int, sand logging.trace(f"StoppedAdd|{node_id}:error") self.stopped.add(node_id) else: + read_set, write_set = trace.parse_and_gather_cmd_rw_sets(trace_object) rw_set = RWSet(read_set, write_set) self.update_rw_set(node_id, rw_set) - to_commit = self.resolve_dependencies_continuous_and_move_frontier(node_id) + + ## Now that command `node_id` is done executing, we can check which other commands + ## can be resolved (that might have finished execution before but where waiting on `node_id`) + logging.debug(f"Finding sets of commands that can be resolved after {node_id} finished executing") + if node_id in self.stopped: + logging.debug(f"Nothing new to be resolved since {node_id} exited with an error.") + if node_id in self.workset: + self.workset.remove(node_id) + logging.trace(f"WorksetRemove|{node_id}") + # If no commands can be resolved this round, + # do nothing and wait until a new command finishes executing + logging.debug("No resolvable nodes were found in this round, nothing will change...") + return + + assert(node_id not in self.stopped) + cmds_to_resolve = self.find_cmds_to_resolve(sorted(list(self.waiting_to_be_resolved.union({node_id})))) + logging.debug(f"Commands to check for dependencies this round are: {sorted(cmds_to_resolve)}") + logging.debug(f"Commands that cannot be resolved this round are: {sorted(self.waiting_to_be_resolved)}") + + ## Resolve dependencies for the commands that can actually be resolved + to_commit = self.resolve_dependencies_continuous_and_move_frontier(cmds_to_resolve) if len(to_commit) == 0: logging.debug(" > No nodes to be committed this round") else: @@ -890,6 +893,7 @@ def populate_to_be_resolved_dict(self, old_committed): logging.debug(f" > Node: {node_id} is currently executing, skipping...") continue else: + ## TODO: KK Need to figure out what to do here (by adding commands after the loop) logging.debug(f" > Node: {node_id} is not executing or waiting to be resolved so we modify its set.") self.to_be_resolved[node_id] = [] traversal = [] From 004b506415f02dfbf53579b2c495d9fdcf74e04c Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Mon, 8 May 2023 16:31:51 -0400 Subject: [PATCH 19/32] Fix the issue with multiple commands in the same loop --- parallel-orch/partial_program_order.py | 9 ++++++++- parallel-orch/scheduler_server.py | 15 ++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 7d473940..99e1588f 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -316,6 +316,9 @@ def __len__(self): def get_node(self, node_id:NodeId) -> Node: return self.nodes[node_id] + def is_node_id(self, node_id:NodeId) -> bool: + return node_id in self.nodes + def get_node_loop_context(self, node_id: NodeId) -> "list[int]": return self.get_node(node_id).get_loop_context() @@ -602,6 +605,10 @@ def unroll_loop(self, loop_id: int): new_to=new_nodes_sink, from_id=new_nodes_sink) + ## Add all new nodes to the workset (since they have to be tracked) + for _, new_node_id in node_mappings.items(): + self.workset.append(new_node_id) + ## Return the new first node return node_mappings[old_nodes_source] @@ -676,7 +683,7 @@ def get_next_non_speculated(self, start): while len(traversal_workset) > 0: node_id = traversal_workset.pop() ## KK 2023-05-04: Why is this happening in a get_next_non_speculated_traversal? - ## TODO: Move this outside in some effectful method + ## TODO: Move this outside in some effectful method. @Giorgo could you help? if node_id not in self.get_currently_executing() \ and node_id not in self.get_committed() \ and node_id not in self.stopped \ diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index 5cd34d0b..5d541461 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -89,16 +89,17 @@ def handle_wait(self, input_cmd: str, connection): ## finish execution. raw_node_id, loop_counters = self.__parse_wait(input_cmd) logging.debug(f'Scheduler: Received wait for node_id: {raw_node_id} with loop counters: {loop_counters}') - - ## If node is in a loop, then start executing it now. - if self.partial_program_order.is_loop_node(raw_node_id): - ## TODO: This unrolling can also happen and be moved to speculation. - ## For now we are being conservative and that is why it only happens here - self.partial_program_order.unroll_loop_node(raw_node_id) - + if self.partial_program_order.is_loop_node(raw_node_id): + ## If node is in a loop and the exact node is not in the graph (meaning that the loop has not been unrolled), + ## then start executing it now. node_id = NodeId(raw_node_id.id, loop_counters) + if not self.partial_program_order.is_node_id(node_id): + ## TODO: This unrolling can also happen and be moved to speculation. + ## For now we are being conservative and that is why it only happens here + self.partial_program_order.unroll_loop_node(raw_node_id) else: + ## If we are not in a loop, then the node id corresponds to the concrete node node_id = raw_node_id ## If the node_id is already committed, just return its exit code From edea9740c597e3ab407b826d859ba21b020be67c Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Mon, 8 May 2023 17:19:21 -0400 Subject: [PATCH 20/32] follow latest pash branch --- deps/pash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/pash b/deps/pash index db97991a..003b03b9 160000 --- a/deps/pash +++ b/deps/pash @@ -1 +1 @@ -Subproject commit db97991a67e619b1cc83c4afca78cdf1422d76c1 +Subproject commit 003b03b962b11838ee6de2f7f59d8358ba0b3668 From 92fdefafeb8c1be2df86f56c661345b59bf1fd5e Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Mon, 8 May 2023 17:19:52 -0400 Subject: [PATCH 21/32] Find non committed nodes properly --- parallel-orch/partial_program_order.py | 42 +++++++++++++++++++------- parallel-orch/scheduler_server.py | 5 ++- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 99e1588f..bbec4646 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -34,6 +34,9 @@ def __init__(self, id: int, loop_iters=None): def has_iters(self): return len(self.loop_iters) > 0 + def get_non_iter_id(self): + return self.id + def __repr__(self): output = str(self.id) if len(self.loop_iters) > 0: @@ -157,7 +160,7 @@ def __init__(self, nodes, edges): ## TODO: Add assertions that committed etc do not contain loop nodes self.committed = set() ## Nodes that are in the frontier can only move to committed - self.frontier = self.get_source_nodes() + self.frontier = [] self.rw_sets = {node_id: None for node_id in self.nodes.keys()} self.workset = [] ## A dictionary from cmd_ids that are currently executing that contains their trace_files @@ -187,6 +190,10 @@ def get_source_nodes(self) -> list: sources.add(to_id) return list(sources) + def get_standard_source_nodes(self) -> list: + source_nodes = self.get_source_nodes() + return self.filter_standard_nodes(source_nodes) + ## This returns all previous nodes of a sub partial order def get_sub_po_source_nodes(self, node_ids: "list[NodeId]") -> "list[NodeId]": # assert(self.is_closed_sub_partial_order(node_ids)) @@ -228,7 +235,7 @@ def get_sub_po_prev_nodes(self, node_ids: "list[NodeId]") -> "list[NodeId]": prev_nodes = prev_nodes.union(prev_ids_set - node_set) ## KK 2024-05-03: I don't see how we can get multiple sources with the current structure - assert(len(prev_nodes) == 1) + assert(len(prev_nodes) <= 1) return list(prev_nodes) ## TODO: Implement this correctly. I have thought of a naive algorithm that @@ -248,12 +255,17 @@ def is_closed_sub_partial_order(self, node_ids: "list[NodeId]") -> bool: return True def init_partial_order(self): + ## Initialize the frontier with all non-loop source nodes + self.frontier = self.get_standard_source_nodes() + ## Initialize the workset self.init_workset() logging.debug(f'Initialized workset') self.populate_to_be_resolved_dict([]) logging.debug(f'To be resolved sets per node:') logging.debug(self.to_be_resolved) assert(self.valid()) + logging.info(f'Initialized the partial order!') + self.log_partial_program_order_info() def init_workset(self): self.workset = self.get_all_non_committed_standard_nodes() @@ -323,7 +335,10 @@ def get_node_loop_context(self, node_id: NodeId) -> "list[int]": return self.get_node(node_id).get_loop_context() def get_all_non_committed(self) -> "list[NodeId]": - return self.get_transitive_closure(self.frontier) + all_node_ids = self.nodes.keys() + non_committed_node_ids = [node_id for node_id in all_node_ids + if not node_id in self.committed] + return non_committed_node_ids def is_loop_node(self, node_id:NodeId) -> bool: return self.get_node(node_id).in_loop() @@ -345,6 +360,7 @@ def create_standard_id_from_loop_node(self, node_id: NodeId, loop_id: int) -> No ## Returns all non committed non-loop nodes def get_all_non_committed_standard_nodes(self) -> "list[NodeId]": all_non_committed = self.get_all_non_committed() + logging.debug(f"All non committed nodes: {all_non_committed}") return self.filter_standard_nodes(all_non_committed) def get_next(self, node_id:NodeId) -> "list[NodeId]": @@ -550,11 +566,6 @@ def unroll_loop(self, loop_id: int): logging.info(f'Unrolling loop with id: {loop_id}') loop_node_ids = self.find_loop_sub_partial_order(loop_id) logging.debug(f'Node ids for loop: {loop_id} are: {loop_node_ids}') - ## Get the previous nodes of sub_po - previous_ids = self.get_sub_po_prev_nodes(loop_node_ids) - assert(len(previous_ids) == 1) - previous_id = previous_ids[0] - logging.debug(f'Previous node id for loop: {loop_id} is {previous_id}') ## Create the new nodes and remap adjacencies accordingly node_mappings = {} @@ -593,9 +604,15 @@ def unroll_loop(self, loop_id: int): old_next_node_ids = self.get_next(new_nodes_sink) assert(len(old_next_node_ids) <= 1) - self.reroute_edge_from(old_from=previous_id, - new_from=new_nodes_sink, - to=old_nodes_source) + + ## Get the previous nodes of sub_po and only if one exists, reroute the edge + previous_ids = self.get_sub_po_prev_nodes(loop_node_ids) + if len(previous_ids) == 1: + previous_id = previous_ids[0] + logging.debug(f'Previous node id for loop: {loop_id} is {previous_id}') + self.reroute_edge_from(old_from=previous_id, + new_from=new_nodes_sink, + to=old_nodes_source) ## Modify the next node of the new po if len(old_next_node_ids) > 0: @@ -737,6 +754,9 @@ def schedule_work(self, limit=0): logging.debug("Scheduling work...") ## KK 2023-05-04 Is it a problem if we do that here? # self.step_forward(copy.deepcopy(self.committed)) + + ## TODO: Move loop unrolling here for speculation too + self.run_all_frontier_cmds() self.schedule_all_workset_non_frontier_cmds() assert(self.valid()) diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index 5d541461..3c09bfe3 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -90,13 +90,12 @@ def handle_wait(self, input_cmd: str, connection): raw_node_id, loop_counters = self.__parse_wait(input_cmd) logging.debug(f'Scheduler: Received wait for node_id: {raw_node_id} with loop counters: {loop_counters}') - if self.partial_program_order.is_loop_node(raw_node_id): - ## If node is in a loop and the exact node is not in the graph (meaning that the loop has not been unrolled), - ## then start executing it now. + if self.partial_program_order.is_loop_node(raw_node_id): node_id = NodeId(raw_node_id.id, loop_counters) if not self.partial_program_order.is_node_id(node_id): ## TODO: This unrolling can also happen and be moved to speculation. ## For now we are being conservative and that is why it only happens here + ## TODO: Move this to the scheduler.schedule_work() (if we have a loop node waiting for response and we are not unrolled, unroll to create work) self.partial_program_order.unroll_loop_node(raw_node_id) else: ## If we are not in a loop, then the node id corresponds to the concrete node From bc76c94e420deaeb54479e491a07b6d34b278a6b Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Mon, 8 May 2023 17:52:52 -0400 Subject: [PATCH 22/32] correctly reroute edges when unrolling a loop --- parallel-orch/partial_program_order.py | 57 +++++++++++++------------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index bbec4646..a5f1f5a4 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -263,16 +263,17 @@ def init_partial_order(self): self.populate_to_be_resolved_dict([]) logging.debug(f'To be resolved sets per node:') logging.debug(self.to_be_resolved) - assert(self.valid()) logging.info(f'Initialized the partial order!') self.log_partial_program_order_info() + + assert(self.valid()) def init_workset(self): self.workset = self.get_all_non_committed_standard_nodes() ## Check if the partial order is done def is_completed(self) -> bool: - return len(self.get_all_non_committed()) == 0 + return len(self.get_all_non_committed_standard_nodes()) == 0 def get_workset(self) -> list: return self.workset @@ -292,6 +293,7 @@ def init_inverse_adjacency(self): # ## TODO: (When there is time) Define a function that checks that the graph is valid ## TODO: Call valid and add assertiosn for loops here. def valid(self): + logging.debug("Checking partial order validity...") self.log_partial_program_order_info() valid1 = self.loop_nodes_valid() ## TODO: Fix the checks below because they do not work currently @@ -368,21 +370,15 @@ def get_next(self, node_id:NodeId) -> "list[NodeId]": def get_prev(self, node_id:NodeId) -> "list[NodeId]": return self.inverse_adjacency[node_id][:] - - def reroute_edge_from(self, old_from: NodeId, new_from: NodeId, to: NodeId): - self.adjacency[old_from].remove(to) - ## KK 2023-05-04 Is it a problem that we append? Maybe we should make that a set - self.adjacency[new_from].append(to) - self.inverse_adjacency[to] = PartialProgramOrder.map_using_mapping(self.inverse_adjacency[to], - {old_from: new_from}) - def reroute_edge_to(self, from_id: NodeId, old_to: NodeId, new_to: NodeId): - self.inverse_adjacency[old_to].remove(from_id) + def add_edge(self, from_id: NodeId, to_id: NodeId): ## KK 2023-05-04 Is it a problem that we append? Maybe we should make that a set - self.inverse_adjacency[new_to].append(from_id) - self.adjacency[from_id] = PartialProgramOrder.map_using_mapping(self.adjacency[from_id], - {old_to: new_to}) + self.adjacency[from_id].append(to_id) + self.inverse_adjacency[to_id].append(from_id) + def remove_edge(self, from_id: NodeId, to_id: NodeId): + self.adjacency[from_id].remove(to_id) + self.inverse_adjacency[to_id].remove(from_id) def get_transitive_closure(self, target_node_ids:"list[NodeId]") -> "list[NodeId]": all_next_transitive = set(target_node_ids) @@ -435,15 +431,22 @@ def add_to_write_set(self, node_id: NodeId, item: str): # Check if the specific command can be resolved. # KK 2023-05-04 I am not even sure what this function does and why is it useful. def cmd_can_be_resolved(self, node_id: int) -> bool: + logging.debug(f'Checking if node {node_id} can be resolved...') # If the command we evaluate has no earlier command currently executing, it can be resolved this round ## KK 2023-05-04 This does not seem correct. In the future (where we don't speculate everything at once) ## there might be a case where nothing is executing but a command can still not be resolved. - if len(self.get_currently_executing()) > 0: + ## + ## TODO: Think what this check needs to be exactly + currently_executing_ids = self.get_currently_executing() + if len(currently_executing_ids) > 0: + logging.debug(f' > Currently executing: {currently_executing_ids}') ## if the node is in the transitive closure of any currently executing commands, ## then we can't resolve it. total_transitive_closure = set() - for other in self.get_currently_executing(): + for other in currently_executing_ids: other_tc = set(self.get_transitive_closure([other])) + logging.debug(f' > Transitive closure of {other} is {currently_executing_ids}') + logging.debug(f' > Edges: {self.adjacency}') total_transitive_closure = total_transitive_closure.union(other_tc) ## If node_id can be reached from the other commands it can't be resolved return not node_id in total_transitive_closure @@ -596,6 +599,7 @@ def unroll_loop(self, loop_id: int): new_nodes_sinks = self.get_sub_po_sink_nodes(list(node_mappings.values())) assert(len(new_nodes_sinks) == 1) new_nodes_sink = new_nodes_sinks[0] + logging.debug(f'The sink of the new iteration for loop: {loop_id} is {new_nodes_sink}') old_nodes_sources = self.get_sub_po_source_nodes(list(node_mappings.keys())) assert(len(old_nodes_sources) == 1) @@ -604,23 +608,18 @@ def unroll_loop(self, loop_id: int): old_next_node_ids = self.get_next(new_nodes_sink) assert(len(old_next_node_ids) <= 1) - - ## Get the previous nodes of sub_po and only if one exists, reroute the edge previous_ids = self.get_sub_po_prev_nodes(loop_node_ids) + assert(len(previous_ids) <= 1) + + ## Add a new edge between the new_sink (concrete iter) and the old_source (loop po) + self.add_edge(new_nodes_sink, old_nodes_source) + + ## Remove the old previous edge of the old_source if it exists if len(previous_ids) == 1: previous_id = previous_ids[0] logging.debug(f'Previous node id for loop: {loop_id} is {previous_id}') - self.reroute_edge_from(old_from=previous_id, - new_from=new_nodes_sink, - to=old_nodes_source) - - ## Modify the next node of the new po - if len(old_next_node_ids) > 0: - assert(len(old_next_node_ids) == 1) - old_next_node_id = old_next_node_ids[0] - self.reroute_edge_to(old_to=old_next_node_id, - new_to=new_nodes_sink, - from_id=new_nodes_sink) + self.remove_edge(from_id=previous_id, + to_id=old_nodes_source) ## Add all new nodes to the workset (since they have to be tracked) for _, new_node_id in node_mappings.items(): From 0c7f88fa686771e4b7611b032e36fdf7c932c5b6 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Mon, 8 May 2023 18:12:24 -0400 Subject: [PATCH 23/32] Remove an unneccessary cursed old method --- parallel-orch/partial_program_order.py | 46 +++++++++++++++++++++----- parallel-orch/scheduler_server.py | 4 +++ 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index a5f1f5a4..d29348ad 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -390,6 +390,17 @@ def get_transitive_closure(self, target_node_ids:"list[NodeId]") -> "list[NodeId all_next_transitive = all_next_transitive.union(successors) next_work.extend(new_next) return list(all_next_transitive) + + def get_inverse_transitive_closure(self, target_node_ids:"list[NodeId]") -> "list[NodeId]": + all_prev_transitive = set(target_node_ids) + next_work = target_node_ids.copy() + while len(next_work) > 0: + node_id = next_work.pop() + predecessors = set(self.get_prev(node_id)) + new_prev = predecessors - all_prev_transitive + all_prev_transitive = all_prev_transitive.union(predecessors) + next_work.extend(new_prev) + return list(all_prev_transitive) def get_transitive_closure_if_can_be_resolved(self, can_be_resolved: list, target_node_ids: list) -> list: all_next_transitive = set(target_node_ids) @@ -420,14 +431,6 @@ def add_to_read_set(self, node_id: NodeId, item: str): def add_to_write_set(self, node_id: NodeId, item: str): self.rw_sets[node_id].add_to_write_set(item) - # TODO: HACK delete this method ASAP - # def get_node_id_from_cmd_no_redir(self, cmd_no_redir: str) -> NodeId: - # for node_id, node in self.nodes.items(): - # if node.get_cmd_no_redir() == cmd_no_redir: - # return node_id - # assert(False) - - # Check if the specific command can be resolved. # KK 2023-05-04 I am not even sure what this function does and why is it useful. def cmd_can_be_resolved(self, node_id: int) -> bool: @@ -454,8 +457,8 @@ def cmd_can_be_resolved(self, node_id: int) -> bool: return True def find_cmds_to_resolve(self, cmd_ids_to_check: list): - cmds_to_resolve = [] logging.debug(f" > Uncommitted commands done executing to be checked: {cmd_ids_to_check}") + cmds_to_resolve = [] for cmd_id in cmd_ids_to_check: # We check if we can resolve any possible dependencies # If we can't, we have to wait for another cycle @@ -532,6 +535,31 @@ def rerun_stopped(self): self.to_be_resolved[cmd_id] = [] self.stopped = new_stopped + ## When the frontend sends a wait for a node, it means that execution in the frontend has + ## already surpassed all nodes prior to it. This is particularly important for loops, + ## since we can't always statically predict how many iterations they will do, so the only + ## definitive way to know that they are done is to receive a wait for a node after them. + def wait_received(self, node_id: NodeId): + ## TODO: Whenever we receive a wait for a node, we always need to check and "commit" all prior loop nodes + ## since we know that they won't have any more iterations (the JIT frontend has already passed them). + ## Note: This doesn't straightforwardly work for nested_loops, we need to figure out something else there + + ## Get inverse_transitive_closure to find all nodes that are before this one + inverse_tc_node_ids = self.get_inverse_transitive_closure([node_id]) + + ## TODO: Out of those nodes, filter out the non-committed loop ones + non_committed_loop_nodes_in_inverse_tc = [node_id for node_id in inverse_tc_node_ids + if not node_id in self.committed and + self.is_loop_node(node_id)] + logging.debug(f'Non committed loop nodes that are predecessors to {node_id} are: {non_committed_loop_nodes_in_inverse_tc}') + + ## TODO: And "close them" + + ## TODO: Untested (not yet covered by test) + + pass + + def find_loop_sub_partial_order(self, loop_id: int) -> "list[NodeId]": loop_node_ids = [] for node_id in self.nodes: diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index 3c09bfe3..5472a675 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -101,6 +101,10 @@ def handle_wait(self, input_cmd: str, connection): ## If we are not in a loop, then the node id corresponds to the concrete node node_id = raw_node_id + ## Inform the partial order that we received a wait for a node so that it can push loops + ## forward and so on. + self.partial_program_order.wait_received(node_id) + ## If the node_id is already committed, just return its exit code if node_id in self.partial_program_order.get_committed(): logging.debug(f'Node: {node_id} found in committed, responding immediately!') From f75ba6890cb24d217c2c81e7f57926fb697f3fce Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Mon, 8 May 2023 18:25:16 -0400 Subject: [PATCH 24/32] Refactor cmd_can_be_resolved so that it is easier to work with --- parallel-orch/partial_program_order.py | 55 +++++++++++++++++++------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index d29348ad..96631d6b 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -440,21 +440,48 @@ def cmd_can_be_resolved(self, node_id: int) -> bool: ## there might be a case where nothing is executing but a command can still not be resolved. ## ## TODO: Think what this check needs to be exactly + # currently_executing_ids = self.get_currently_executing() + ## The current problem is that after a loop iteration is done executing, there is nothing else + ## being executed, but the actual loop node (the abstract one) has not yet been closed. + # if len(currently_executing_ids) > 0: + # ## if the node is in the transitive closure of any currently executing commands, + # ## then we can't resolve it. + # total_transitive_closure = set() + # for other in currently_executing_ids: + # other_tc = set(self.get_transitive_closure([other])) + # logging.debug(f' > Transitive closure of {other} is {currently_executing_ids}') + # logging.debug(f' > Edges: {self.adjacency}') + # total_transitive_closure = total_transitive_closure.union(other_tc) + # ## If node_id can be reached from the other commands it can't be resolved + # return not node_id in total_transitive_closure + # else: + # return True + + ## Alternative check below! + + ## Get inverse_transitive_closure to find all nodes that are before this one + inverse_tc_node_ids = self.get_inverse_transitive_closure([node_id]) + + ## Out of those nodes, filter out the non-committed ones + non_committed_nodes_in_inverse_tc = [node_id for node_id in inverse_tc_node_ids + if not node_id in self.committed] + logging.debug(f' > Non committed nodes that are predecessors to {node_id} are: {non_committed_nodes_in_inverse_tc}') + currently_executing_ids = self.get_currently_executing() - if len(currently_executing_ids) > 0: - logging.debug(f' > Currently executing: {currently_executing_ids}') - ## if the node is in the transitive closure of any currently executing commands, - ## then we can't resolve it. - total_transitive_closure = set() - for other in currently_executing_ids: - other_tc = set(self.get_transitive_closure([other])) - logging.debug(f' > Transitive closure of {other} is {currently_executing_ids}') - logging.debug(f' > Edges: {self.adjacency}') - total_transitive_closure = total_transitive_closure.union(other_tc) - ## If node_id can be reached from the other commands it can't be resolved - return not node_id in total_transitive_closure - else: - return True + logging.debug(f' > Currently executing: {currently_executing_ids}') + + ## TODO: Make this check more efficient + for other_node_id in inverse_tc_node_ids: + ## If one of the non-committed nodes in the inverse_tc is currently executing then + ## we can't resolve this command + if other_node_id in currently_executing_ids: + return False + + ## TODO: Add a check for loop nodes here and do not resolve if a non-committed loop exists + + ## Otherwise we can return + return True + def find_cmds_to_resolve(self, cmd_ids_to_check: list): logging.debug(f" > Uncommitted commands done executing to be checked: {cmd_ids_to_check}") From 5cbb0191b473d0a6afa6b1ef0af36fe4d9eec820 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Mon, 8 May 2023 18:45:15 -0400 Subject: [PATCH 25/32] Add loop nodes to committed once we receive a later wait --- parallel-orch/partial_program_order.py | 60 ++++++++++---------------- 1 file changed, 23 insertions(+), 37 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 96631d6b..5801b9be 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -303,10 +303,11 @@ def valid(self): return valid1 ## Checks if loop nodes are all valid, i.e., that there are no loop nodes handled like normal ones, - ## e.g., in workset, committed, etc + ## e.g., in workset, frontier etc + ## + ## Note that loop nodes can be in the committed set (after we are done executing all iterations of a loop) def loop_nodes_valid(self): - forbidden_sets = self.get_committed() + \ - self.get_frontier() + \ + forbidden_sets = self.get_frontier() + \ self.get_workset() + \ list(self.stopped) + \ list(self.commands_currently_executing.keys()) @@ -435,30 +436,6 @@ def add_to_write_set(self, node_id: NodeId, item: str): # KK 2023-05-04 I am not even sure what this function does and why is it useful. def cmd_can_be_resolved(self, node_id: int) -> bool: logging.debug(f'Checking if node {node_id} can be resolved...') - # If the command we evaluate has no earlier command currently executing, it can be resolved this round - ## KK 2023-05-04 This does not seem correct. In the future (where we don't speculate everything at once) - ## there might be a case where nothing is executing but a command can still not be resolved. - ## - ## TODO: Think what this check needs to be exactly - # currently_executing_ids = self.get_currently_executing() - ## The current problem is that after a loop iteration is done executing, there is nothing else - ## being executed, but the actual loop node (the abstract one) has not yet been closed. - # if len(currently_executing_ids) > 0: - # ## if the node is in the transitive closure of any currently executing commands, - # ## then we can't resolve it. - # total_transitive_closure = set() - # for other in currently_executing_ids: - # other_tc = set(self.get_transitive_closure([other])) - # logging.debug(f' > Transitive closure of {other} is {currently_executing_ids}') - # logging.debug(f' > Edges: {self.adjacency}') - # total_transitive_closure = total_transitive_closure.union(other_tc) - # ## If node_id can be reached from the other commands it can't be resolved - # return not node_id in total_transitive_closure - # else: - # return True - - ## Alternative check below! - ## Get inverse_transitive_closure to find all nodes that are before this one inverse_tc_node_ids = self.get_inverse_transitive_closure([node_id]) @@ -474,12 +451,19 @@ def cmd_can_be_resolved(self, node_id: int) -> bool: for other_node_id in inverse_tc_node_ids: ## If one of the non-committed nodes in the inverse_tc is currently executing then ## we can't resolve this command + ## KK 2023-05-04 This is not sufficient. In the future (where we don't speculate everything at once) + ## there might be a case where nothing is executing but a command can still not be resolved. if other_node_id in currently_executing_ids: + logging.debug(f' >> Cannot resolve {node_id}: Node {other_node_id} in non committed inverse tc is currently executing') return False - ## TODO: Add a check for loop nodes here and do not resolve if a non-committed loop exists + ## If there exists a loop node that is not committed before the command then we cannot resolve. + if self.is_loop_node(other_node_id): + logging.debug(f' >> Cannot resolve {node_id}: Node {other_node_id} in non committed inverse tc is a loop node') + return False ## Otherwise we can return + logging.debug(f' >> Able to resolve {node_id}') return True @@ -567,24 +551,26 @@ def rerun_stopped(self): ## since we can't always statically predict how many iterations they will do, so the only ## definitive way to know that they are done is to receive a wait for a node after them. def wait_received(self, node_id: NodeId): - ## TODO: Whenever we receive a wait for a node, we always need to check and "commit" all prior loop nodes - ## since we know that they won't have any more iterations (the JIT frontend has already passed them). - ## Note: This doesn't straightforwardly work for nested_loops, we need to figure out something else there + ## Whenever we receive a wait for a node, we always need to check and "commit" all prior loop nodes + ## since we know that they won't have any more iterations (the JIT frontend has already passed them). + ## + ## TODO: This doesn't straightforwardly work for nested_loops, we need to figure out something else there ## Get inverse_transitive_closure to find all nodes that are before this one inverse_tc_node_ids = self.get_inverse_transitive_closure([node_id]) - ## TODO: Out of those nodes, filter out the non-committed loop ones + ## Out of those nodes, filter out the non-committed loop ones non_committed_loop_nodes_in_inverse_tc = [node_id for node_id in inverse_tc_node_ids if not node_id in self.committed and self.is_loop_node(node_id)] logging.debug(f'Non committed loop nodes that are predecessors to {node_id} are: {non_committed_loop_nodes_in_inverse_tc}') - ## TODO: And "close them" - - ## TODO: Untested (not yet covered by test) - - pass + ## And "close them" + new_committed_nodes = non_committed_loop_nodes_in_inverse_tc + logging.debug(f'Adding following loop nodes to committed: {new_committed_nodes}') + self.committed = self.committed.union(set(new_committed_nodes)) + ## TODO: Add some form of validity assertion after we are done with this. + ## Just to make sure that we haven't violated the continuity of the committed set. def find_loop_sub_partial_order(self, loop_id: int) -> "list[NodeId]": From f44667195524183b82486a8592335bcc60192176 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Tue, 9 May 2023 09:58:42 -0400 Subject: [PATCH 26/32] Refactor PO --- parallel-orch/partial_program_order.py | 58 ++++++++++++++++++-------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 5801b9be..0def3725 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -169,6 +169,7 @@ def __init__(self, nodes, edges): ## from cmd_id -> CompletedNodeInfo ## Note: this dictionary does not contain information self.completed_node_info = {} + ## KK 2023-05-09 @Giorgo What is the difference of the following two? self.to_be_resolved = {} self.waiting_to_be_resolved = set() ## Contains the most recent sandbox directory paths @@ -432,6 +433,9 @@ def add_to_read_set(self, node_id: NodeId, item: str): def add_to_write_set(self, node_id: NodeId, item: str): self.rw_sets[node_id].add_to_write_set(item) + def add_to_waiting_to_be_resolved(self, node_id: NodeId): + self.waiting_to_be_resolved = self.waiting_to_be_resolved.union([node_id]) + # Check if the specific command can be resolved. # KK 2023-05-04 I am not even sure what this function does and why is it useful. def cmd_can_be_resolved(self, node_id: int) -> bool: @@ -466,8 +470,23 @@ def cmd_can_be_resolved(self, node_id: int) -> bool: logging.debug(f' >> Able to resolve {node_id}') return True - - def find_cmds_to_resolve(self, cmd_ids_to_check: list): + def resolve_commands_that_can_be_resolved_and_step_forward(self): + cmds_to_resolve = self.__pop_cmds_to_resolve_from_waiting_to_be_resolved() + logging.debug(f"Commands to check for dependencies this round are: {sorted(cmds_to_resolve)}") + logging.debug(f"Commands that cannot be resolved this round are: {sorted(self.waiting_to_be_resolved)}") + + ## Resolve dependencies for the commands that can actually be resolved + to_commit = self.__resolve_dependencies_continuous_and_move_frontier(cmds_to_resolve) + if len(to_commit) == 0: + logging.debug(" > No nodes to be committed this round") + else: + logging.debug(f" > Nodes to be committed this round: {to_commit}") + logging.trace(f"Commit|"+",".join(str(node_id) for node_id in to_commit)) + self.commit_cmd_workspaces(to_commit) + # self.print_cmd_stderr(stderr) + + def __pop_cmds_to_resolve_from_waiting_to_be_resolved(self): + cmd_ids_to_check = sorted(list(self.waiting_to_be_resolved)) logging.debug(f" > Uncommitted commands done executing to be checked: {cmd_ids_to_check}") cmds_to_resolve = [] for cmd_id in cmd_ids_to_check: @@ -509,7 +528,7 @@ def resolve_dependencies(self, cmds_to_resolve): ## Resolve all the forward dependencies and update the workset ## Forward dependency is when a command's output is the same ## as the input of a following command - def resolve_dependencies_continuous_and_move_frontier(self, cmds_to_resolve): + def __resolve_dependencies_continuous_and_move_frontier(self, cmds_to_resolve): self.log_partial_program_order_info() logging.debug(f"Commands to be checked for dependencies: {sorted(cmds_to_resolve)}") @@ -572,6 +591,8 @@ def wait_received(self, node_id: NodeId): ## TODO: Add some form of validity assertion after we are done with this. ## Just to make sure that we haven't violated the continuity of the committed set. + ## TODO: If we added new nodes, we need to check whether there is something to be resolved here. + ## Do that first thing tomorrow def find_loop_sub_partial_order(self, loop_id: int) -> "list[NodeId]": loop_node_ids = [] @@ -698,6 +719,18 @@ def unroll_loop_node(self, node_id: NodeId): self.frontier.append(new_first_node_id) + ## KK 2023-09-05 @Giorgo Do all of these steps need to be done at once, or are these methods + ## meaningful even if called one by one? In general, I would like there to + ## be a clear set of 1-3 methods that are supposed to be used whenever we + ## add some new nodes (or progress the PO in some way) that will step it properly, + ## while being idempotent (if they are called multiple times nothing goes wrong). + ## + ## Internal functions on the other hand (ones that cannot be called on their own + ## since they might leave the PO in a partial state) should be prefixed with an + ## underscore. + ## + ## All top-level functions should get minimal arguments (none if possible) + ## and should just get their relevant state from the fields of the PO. def step_forward(self, old_committed): logging.debug(" > Committing frontier") self.commit_frontier() @@ -895,19 +928,11 @@ def command_execution_completed(self, node_id: NodeId, riker_exit_code:int, sand return assert(node_id not in self.stopped) - cmds_to_resolve = self.find_cmds_to_resolve(sorted(list(self.waiting_to_be_resolved.union({node_id})))) - logging.debug(f"Commands to check for dependencies this round are: {sorted(cmds_to_resolve)}") - logging.debug(f"Commands that cannot be resolved this round are: {sorted(self.waiting_to_be_resolved)}") - - ## Resolve dependencies for the commands that can actually be resolved - to_commit = self.resolve_dependencies_continuous_and_move_frontier(cmds_to_resolve) - if len(to_commit) == 0: - logging.debug(" > No nodes to be committed this round") - else: - logging.debug(f" > Nodes to be committed this round: {to_commit}") - logging.trace(f"Commit|"+",".join(str(node_id) for node_id in to_commit)) - self.commit_cmd_workspaces(to_commit) - # self.print_cmd_stderr(stderr) + ## Since the command properly finished executing, it now waits to be resolved + self.add_to_waiting_to_be_resolved(node_id) + ## We can now call the general resolution method that determines which commands + ## can be resolved (all their dependencies are done executing), and resolves them. + self.resolve_commands_that_can_be_resolved_and_step_forward() assert(self.valid()) def print_cmd_stderr(self, stderr): @@ -960,7 +985,6 @@ def populate_to_be_resolved_dict(self, old_committed): logging.debug(f" > Node: {node_id} is currently executing, skipping...") continue else: - ## TODO: KK Need to figure out what to do here (by adding commands after the loop) logging.debug(f" > Node: {node_id} is not executing or waiting to be resolved so we modify its set.") self.to_be_resolved[node_id] = [] traversal = [] From 03300ab785ae0b6950aa212b71220e889e72dc02 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Wed, 10 May 2023 10:56:56 -0400 Subject: [PATCH 27/32] Add a loop test --- test/test_orch.sh | 10 ++++++++++ test/test_scripts/test_loop.sh | 12 ++++++++++++ 2 files changed, 22 insertions(+) create mode 100644 test/test_scripts/test_loop.sh diff --git a/test/test_orch.sh b/test/test_orch.sh index b84492db..637ff9ca 100755 --- a/test/test_orch.sh +++ b/test/test_orch.sh @@ -82,6 +82,7 @@ run_test() test_orch_ec=$? ## Print stderr + ## TODO: Fix this to print the stderr continuously by doing the execution checking inside pash-spec if [ $DEBUG -ge 1 ]; then cat "$stderr_file" 1>&2 fi @@ -295,6 +296,14 @@ test_stdout() $shell $2/test_stdout.sh } +test_loop() +{ + local shell=$1 + $shell $2/test_loop.sh +} + +## TODO: make more loop tests with nested loops and commands after the loop + # We run all tests composed with && to exit on the first that fails if [ "$#" -eq 0 ]; then run_test test1_1 # "1 2 2 1" @@ -323,6 +332,7 @@ if [ "$#" -eq 0 ]; then run_test test9_2 run_test test9_3 run_test test_stdout + run_test test_loop else for testname in $@ do diff --git a/test/test_scripts/test_loop.sh b/test/test_scripts/test_loop.sh new file mode 100644 index 00000000..539b3e25 --- /dev/null +++ b/test/test_scripts/test_loop.sh @@ -0,0 +1,12 @@ +echo hi +for i in 1 2 3; do + echo hi1 + sleep 1 + echo hi2 +done + +## Future loop tests must include: +## 1. A single loop with a single command without anything else +## 2. Multiple commands in the same loop +## 3. Nested loops +## 4. Commands before and after a loop From 6100325c1479318a1294e472d65bac5f936b033a Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Wed, 10 May 2023 10:57:07 -0400 Subject: [PATCH 28/32] Refactor step_forward --- parallel-orch/partial_program_order.py | 71 ++++++++++++++++++-------- 1 file changed, 49 insertions(+), 22 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index c4540d47..8c73c0b5 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -454,7 +454,7 @@ def cmd_can_be_resolved(self, node_id: int) -> bool: logging.debug(f' > Currently executing: {currently_executing_ids}') ## TODO: Make this check more efficient - for other_node_id in inverse_tc_node_ids: + for other_node_id in non_committed_nodes_in_inverse_tc: ## If one of the non-committed nodes in the inverse_tc is currently executing then ## we can't resolve this command ## KK 2023-05-04 This is not sufficient. In the future (where we don't speculate everything at once) @@ -521,7 +521,10 @@ def resolve_dependencies(self, cmds_to_resolve): first_cmd_ids = sorted([cmd_id for cmd_id in self.to_be_resolved[second_cmd_id] if cmd_id not in self.stopped]) for first_cmd_id in first_cmd_ids: if second_cmd_id not in new_workset: - ## Only forward dependencies bother us now + ## We only check for forward dependencies if the first node is not a loop (abstract) node + if self.is_loop_node(first_cmd_id): + logging.debug(f' > Skipping dependency check with node {first_cmd_id} because it is a loop node') + continue if self.has_forward_dependency(first_cmd_id, second_cmd_id): logging.debug(f' > Command {second_cmd_id} was added to the workset, due to a forward dependency with {first_cmd_id}') new_workset.add(second_cmd_id) @@ -587,14 +590,23 @@ def wait_received(self, node_id: NodeId): logging.debug(f'Non committed loop nodes that are predecessors to {node_id} are: {non_committed_loop_nodes_in_inverse_tc}') ## And "close them" + ## TODO: This is a hack here, we need to have a proper method that commits + ## nodes and does whatever else is needed to do (e.g., add new nodes to frontier) new_committed_nodes = non_committed_loop_nodes_in_inverse_tc logging.debug(f'Adding following loop nodes to committed: {new_committed_nodes}') self.committed = self.committed.union(set(new_committed_nodes)) + + ## Since we committed some nodes, let's make sure that we also push the frontier + ## TODO: Can we do this using a method? + ## TODO: Add some form of validity assertion after we are done with this. ## Just to make sure that we haven't violated the continuity of the committed set. - ## TODO: If we added new nodes, we need to check whether there is something to be resolved here. - ## Do that first thing tomorrow + ## We check if something can be resolved and stepped forward here + ## KK 2023-05-10 This seems to work for all tests (so it might be idempotent + ## since in many tests there is nothing new to resolve after a wait) + self.resolve_commands_that_can_be_resolved_and_step_forward() + def find_loop_sub_partial_order(self, loop_id: int) -> "list[NodeId]": loop_node_ids = [] @@ -642,6 +654,7 @@ def unroll_loop(self, loop_id: int): node_mappings[node_id] = new_loop_node_id ## Create the new node self.nodes[new_loop_node_id] = Node(new_loop_node_id, node.cmd, []) + self.executions[new_loop_node_id] = 0 logging.debug(f'New loop ids: {node_mappings}') ## Create the new adjacencies, by mapping adjacencies in the node set to the new node ids @@ -689,6 +702,9 @@ def unroll_loop(self, loop_id: int): for _, new_node_id in node_mappings.items(): self.workset.append(new_node_id) + ## TODO: We need to correctly populate the resolved set of next commands + ## after unrolling the loop. + ## Return the new first node return node_mappings[old_nodes_source] @@ -733,36 +749,47 @@ def unroll_loop_node(self, node_id: NodeId): ## ## All top-level functions should get minimal arguments (none if possible) ## and should just get their relevant state from the fields of the PO. + ## TODO: step_forward seems to be an internal function def step_forward(self, old_committed): - logging.debug(" > Committing frontier") - self.commit_frontier() - logging.debug(" > Moving frontier forward") - self.move_frontier_forward() + self.frontier_commit_and_push() self.rerun_stopped() self.populate_to_be_resolved_dict(old_committed) - # Add frontier commands to committed set - ## TODO: Loop nodes should not be committed until we receive a wait for the node after them. - ## We don't know if they are done executing until then. - def commit_frontier(self): + ## Pushes the frontier forward a single step for all commands in it that can be committed + ## KK 2023-05-10 Should this actually push the frontier as far as possible (and not just a single step?) + ## TODO: Actually this pushes the frontier multiple steps using the update in get_next_non_speculated. + ## @Giorgo: We need to move this outside of this function. One way to do it would be with an + ## outer loop that pushes the frontier one step until there are no more changes (pseudocode below for inspiration): + ## while changes: + ## push_frontier_one_step + ## if frontier was moved: + ## changes = True + ## else: + ## changes = False + def frontier_commit_and_push(self): + logging.debug(" > Commiting and pushing frontier") + logging.debug(f' > Frontier: {self.frontier}') + new_frontier = [] # Second condition below may be unecessary - logging.debug(f'Frontier: {self.frontier}') for frontier_node in self.frontier: + ## If a node is not in the workset it means that it is actually done executing if frontier_node not in self.workset: + ## Commit the node + logging.trace(f" > Commiting node {frontier_node}") self.save_commit_state_of_cmd(frontier_node) - self.committed.update({frontier_node for frontier_node in self.frontier if frontier_node not in self.workset}) + self.committed.add(frontier_node) - def move_frontier_forward(self): - new_frontier = [] - for node in self.frontier: - if node not in self.workset: - to_add_in_frontier = self.get_next_standard_non_speculated(node) + ## Add its successors to the frontier + ## TODO: Fix the side-effectful hack in get_next_standard_non_speculated + to_add_in_frontier = self.get_next_standard_non_speculated(frontier_node) new_frontier.extend(to_add_in_frontier) logging.trace(f"FrontierAdd|{','.join(str(node_id) for node_id in to_add_in_frontier)}") - # If node is being executed again, we cannot progress further + # If node is still being executed, we cannot progress further else: - new_frontier.extend([node]) - logging.trace(f"FrontierAdd|{node}") + new_frontier.extend([frontier_node]) + logging.trace(f" > Not commiting node {frontier_node}, readding to frontier") + + ## Update the frontier to the new frontier self.frontier = new_frontier def get_next_standard_non_speculated(self, start: NodeId) -> "list[NodeId]": From d78fc9b4e7bbfa2c1c5488f0df3e300e42dc5c86 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Wed, 10 May 2023 11:10:49 -0400 Subject: [PATCH 29/32] Clean up a side-effectful pure function --- parallel-orch/partial_program_order.py | 90 ++++++++++---------------- 1 file changed, 34 insertions(+), 56 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 8c73c0b5..a06b972a 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -755,66 +755,44 @@ def step_forward(self, old_committed): self.rerun_stopped() self.populate_to_be_resolved_dict(old_committed) - ## Pushes the frontier forward a single step for all commands in it that can be committed - ## KK 2023-05-10 Should this actually push the frontier as far as possible (and not just a single step?) - ## TODO: Actually this pushes the frontier multiple steps using the update in get_next_non_speculated. - ## @Giorgo: We need to move this outside of this function. One way to do it would be with an - ## outer loop that pushes the frontier one step until there are no more changes (pseudocode below for inspiration): - ## while changes: - ## push_frontier_one_step - ## if frontier was moved: - ## changes = True - ## else: - ## changes = False + ## Pushes the frontier forward as much as possible for all commands in it that can be committed def frontier_commit_and_push(self): logging.debug(" > Commiting and pushing frontier") logging.debug(f' > Frontier: {self.frontier}') - new_frontier = [] - # Second condition below may be unecessary - for frontier_node in self.frontier: - ## If a node is not in the workset it means that it is actually done executing - if frontier_node not in self.workset: - ## Commit the node - logging.trace(f" > Commiting node {frontier_node}") - self.save_commit_state_of_cmd(frontier_node) - self.committed.add(frontier_node) - - ## Add its successors to the frontier - ## TODO: Fix the side-effectful hack in get_next_standard_non_speculated - to_add_in_frontier = self.get_next_standard_non_speculated(frontier_node) - new_frontier.extend(to_add_in_frontier) - logging.trace(f"FrontierAdd|{','.join(str(node_id) for node_id in to_add_in_frontier)}") - # If node is still being executed, we cannot progress further - else: - new_frontier.extend([frontier_node]) - logging.trace(f" > Not commiting node {frontier_node}, readding to frontier") - - ## Update the frontier to the new frontier - self.frontier = new_frontier - - def get_next_standard_non_speculated(self, start: NodeId) -> "list[NodeId]": - next_non_speculated = self.get_next_non_speculated(start) - return self.filter_standard_nodes(next_non_speculated) - - def get_next_non_speculated(self, start): - traversal_workset = self.get_next(start) - next_non_speculated = [] - while len(traversal_workset) > 0: - node_id = traversal_workset.pop() - ## KK 2023-05-04: Why is this happening in a get_next_non_speculated_traversal? - ## TODO: Move this outside in some effectful method. @Giorgo could you help? - if node_id not in self.get_currently_executing() \ - and node_id not in self.get_committed() \ - and node_id not in self.stopped \ - and node_id not in self.waiting_to_be_resolved \ - and node_id not in self.workset\ - and not self.is_loop_node(node_id): - self.save_commit_state_of_cmd(node_id) - self.committed.add(node_id) - traversal_workset.extend(self.get_next(node_id)) + changes_in_frontier = True + while changes_in_frontier: + new_frontier = [] + changes_in_frontier = False + # Second condition below may be unecessary + for frontier_node in self.frontier: + ## If a node is not in the workset it means that it is actually done executing + ## KK 2023-05-10 Do we need all these conditions in here? Some might be redundant? + if frontier_node not in self.get_currently_executing() \ + and frontier_node not in self.get_committed() \ + and frontier_node not in self.stopped \ + and frontier_node not in self.waiting_to_be_resolved \ + and frontier_node not in self.workset\ + and not self.is_loop_node(frontier_node): + ## Commit the node + logging.trace(f" > Commiting node {frontier_node}") + self.save_commit_state_of_cmd(frontier_node) + self.committed.add(frontier_node) + + ## Add its non-loop successors to the frontier + next_nodes = self.get_next(frontier_node) + next_standard_nodes = self.filter_standard_nodes(next_nodes) + logging.trace(f"FrontierAdd|{','.join(str(node_id) for node_id in next_standard_nodes)}") + new_frontier.extend(next_standard_nodes) + + ## There are some changes in the frontier so we need to reenter the loop + changes_in_frontier = True + # If node is still being executed, we cannot progress further else: - next_non_speculated.append(node_id) - return list(next_non_speculated) + new_frontier.extend([frontier_node]) + logging.trace(f" > Not commiting node {frontier_node}, readding to frontier") + + ## Update the frontier to the new frontier + self.frontier = new_frontier ## For a file - dir forward dependency to exist, From 4825e77eb7febaa2e243de5924465f9f5ab0c488 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Wed, 10 May 2023 11:31:02 -0400 Subject: [PATCH 30/32] Fix commands after loop and some more refactoring --- parallel-orch/partial_program_order.py | 60 ++++++++++++++++++-------- test/test_scripts/test_loop.sh | 1 + 2 files changed, 43 insertions(+), 18 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index a06b972a..c4a02e3f 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -281,9 +281,15 @@ def is_completed(self) -> bool: def get_workset(self) -> list: return self.workset - def get_committed(self) -> list: + def get_committed(self) -> set: + return copy.deepcopy(self.committed) + + def get_committed_list(self) -> list: return sorted(list(self.committed)) + def is_committed(self, node_id: NodeId) -> bool: + return node_id in self.committed + def get_frontier(self) -> list: return sorted(list(self.frontier)) @@ -343,9 +349,16 @@ def get_node_loop_context(self, node_id: NodeId) -> "list[int]": def get_all_non_committed(self) -> "list[NodeId]": all_node_ids = self.nodes.keys() non_committed_node_ids = [node_id for node_id in all_node_ids - if not node_id in self.committed] + if not self.is_committed(node_id)] return non_committed_node_ids + ## This adds a node to the committed set and saves important information + def commit_node(self, node_id: NodeId): + logging.trace(f" > Commiting node {node_id}") + self.save_commit_state_of_cmd(node_id) + self.committed.add(node_id) + + def is_loop_node(self, node_id:NodeId) -> bool: return self.get_node(node_id).in_loop() @@ -447,7 +460,7 @@ def cmd_can_be_resolved(self, node_id: int) -> bool: ## Out of those nodes, filter out the non-committed ones non_committed_nodes_in_inverse_tc = [node_id for node_id in inverse_tc_node_ids - if not node_id in self.committed] + if not self.is_committed(node_id)] logging.debug(f' > Non committed nodes that are predecessors to {node_id} are: {non_committed_nodes_in_inverse_tc}') currently_executing_ids = self.get_currently_executing() @@ -549,7 +562,7 @@ def __resolve_dependencies_continuous_and_move_frontier(self, cmds_to_resolve): logging.trace(f"WorksetAdd|{','.join(str(cmd_id) for cmd_id in workset_diff)}") # Keep the previous committed state - old_committed = self.committed.copy() + old_committed = self.get_committed() # We want stopped commands to not enter the workset again yet assert(set(self.workset).isdisjoint(self.stopped)) @@ -585,7 +598,7 @@ def wait_received(self, node_id: NodeId): ## Out of those nodes, filter out the non-committed loop ones non_committed_loop_nodes_in_inverse_tc = [node_id for node_id in inverse_tc_node_ids - if not node_id in self.committed and + if not self.is_committed(node_id) and self.is_loop_node(node_id)] logging.debug(f'Non committed loop nodes that are predecessors to {node_id} are: {non_committed_loop_nodes_in_inverse_tc}') @@ -594,10 +607,23 @@ def wait_received(self, node_id: NodeId): ## nodes and does whatever else is needed to do (e.g., add new nodes to frontier) new_committed_nodes = non_committed_loop_nodes_in_inverse_tc logging.debug(f'Adding following loop nodes to committed: {new_committed_nodes}') - self.committed = self.committed.union(set(new_committed_nodes)) - + for node_id in new_committed_nodes: + self.commit_node(node_id) + ## Since we committed some nodes, let's make sure that we also push the frontier - ## TODO: Can we do this using a method? + ## TODO: Can we do this in a less hacky method? By using a well-defined commit_node_and_push_frontier method? + if len(new_committed_nodes) > 0: + new_nodes_sinks = self.get_sub_po_sink_nodes(new_committed_nodes) + assert(len(new_nodes_sinks) == 1) + new_nodes_sink = new_nodes_sinks[0] + logging.debug(f'The sink of the newly committed loop nodes is {new_nodes_sink}') + + next_nodes = self.get_next(new_nodes_sink) + next_standard_nodes = self.filter_standard_nodes(next_nodes) + logging.trace(f"Adding its next nodes to the frontier|{','.join(str(node_id) for node_id in next_standard_nodes)}") + self.frontier.extend(next_standard_nodes) + + ## TODO: Add some form of validity assertion after we are done with this. ## Just to make sure that we haven't violated the continuity of the committed set. @@ -733,7 +759,7 @@ def unroll_loop_node(self, node_id: NodeId): ## TODO: This needs to change when we modify unrolling to happen speculatively too ## TODO: This needs to properly add the node to frontier and to resolve dictionary - self.step_forward(copy.deepcopy(self.committed)) + self.step_forward(self.get_committed()) self.frontier.append(new_first_node_id) @@ -774,9 +800,7 @@ def frontier_commit_and_push(self): and frontier_node not in self.workset\ and not self.is_loop_node(frontier_node): ## Commit the node - logging.trace(f" > Commiting node {frontier_node}") - self.save_commit_state_of_cmd(frontier_node) - self.committed.add(frontier_node) + self.commit_node(frontier_node) ## Add its non-loop successors to the frontier next_nodes = self.get_next(frontier_node) @@ -969,7 +993,7 @@ def log_rw_sets(self): def log_partial_program_order_info(self): logging.debug(f"=" * 80) logging.debug(f"WORKSET: {self.get_workset()}") - logging.debug(f"COMMITTED: {self.get_committed()}") + logging.debug(f"COMMITTED: {self.get_committed_list()}") logging.debug(f"FRONTIER: {self.get_frontier()}") logging.debug(f"EXECUTING: {list(self.commands_currently_executing.keys())}") logging.debug(f"STOPPED: {list(self.stopped)}") @@ -982,7 +1006,7 @@ def log_partial_program_order_info(self): def populate_to_be_resolved_dict(self, old_committed): logging.debug("Populating the resolved dictionary for all nodes") for node_id in self.nodes: - if node_id in self.committed: + if self.is_committed(node_id): logging.debug(f" > Node: {node_id} is committed, emptying its dict") self.to_be_resolved[node_id] = [] continue @@ -1002,7 +1026,7 @@ def populate_to_be_resolved_dict(self, old_committed): ## but this doesn't make sense because we are only modifying ## the to_be_resolved of currently executing commands. # relevant_committed = old_committed - relevant_committed = self.committed + relevant_committed = self.get_committed() if node_id not in relevant_committed: to_add = self.get_prev(node_id).copy() traversal = to_add.copy() @@ -1023,13 +1047,13 @@ def get_currently_executing(self) -> list: ## KK 2023-05-02 What does this function do? def save_commit_state_of_cmd(self, cmd_id): self.committed_order.append(cmd_id) - self.commit_state[cmd_id] = set(self.committed) - set(self.to_be_resolved[cmd_id]) + self.commit_state[cmd_id] = set(self.get_committed()) - set(self.to_be_resolved[cmd_id]) def log_committed_cmd_state(self): logging.info("---------- Committed Order -----------") logging.info(" " + " -> ".join(map(str, self.committed_order))) logging.info("---------- Committed State -----------") - for cmd in sorted(self.committed): + for cmd in sorted(self.get_committed_list()): if len(self.commit_state[cmd]) == 0: logging.info(f" CMD {cmd} on\t\tSTART") else: @@ -1037,7 +1061,7 @@ def log_committed_cmd_state(self): def log_executions(self): logging.debug("---------- (Re)executions ------------") - for cmd in sorted(self.committed): + for cmd in sorted(self.get_committed_list()): logging.debug(f" CMD {cmd} executed {self.executions[cmd]} times") logging.trace(f"Executions|{cmd}|{self.executions[cmd]}") logging.debug(f" Total (re)executions: {sum(list(self.executions.values()))}") diff --git a/test/test_scripts/test_loop.sh b/test/test_scripts/test_loop.sh index 539b3e25..075b752f 100644 --- a/test/test_scripts/test_loop.sh +++ b/test/test_scripts/test_loop.sh @@ -4,6 +4,7 @@ for i in 1 2 3; do sleep 1 echo hi2 done +echo hi3 ## Future loop tests must include: ## 1. A single loop with a single command without anything else From d2352b671744292278c71e1b995a58047844f0de Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Wed, 10 May 2023 11:36:40 -0400 Subject: [PATCH 31/32] remove loop --- loop.sh | 10 ---------- 1 file changed, 10 deletions(-) delete mode 100644 loop.sh diff --git a/loop.sh b/loop.sh deleted file mode 100644 index c9be532b..00000000 --- a/loop.sh +++ /dev/null @@ -1,10 +0,0 @@ -echo hi -for i in 1 2 3 4 5; do - ## Can't do nested loops yet - # for j in 1 2; do - ## Until we figure out variables the only way to determine that this run twice is by measuring the executed time - sleep 1 - ## TODO: Manage to do multiple commands in a single loop - echo hi - # done -done From b6e7b767e145c1f7a63a973925b54ea90a0fc488 Mon Sep 17 00:00:00 2001 From: Konstantinos Kallas Date: Thu, 11 May 2023 10:21:47 -0400 Subject: [PATCH 32/32] Follow pash future --- deps/pash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/pash b/deps/pash index 003b03b9..8e48fb65 160000 --- a/deps/pash +++ b/deps/pash @@ -1 +1 @@ -Subproject commit 003b03b962b11838ee6de2f7f59d8358ba0b3668 +Subproject commit 8e48fb6527de422eb964d9993b6f471488c8dde3