From 4e679305df3927f2d2116a27143eb76a531bcb19 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Wed, 21 Jun 2023 09:27:20 -0600 Subject: [PATCH 01/34] Change orch to handle commands that try to access the network on frontier --- parallel-orch/partial_program_order.py | 41 +++++++++++---------- parallel-orch/template_script_to_execute.sh | 12 ++++-- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 0f39cf71..832c9850 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -1223,31 +1223,34 @@ def command_execution_completed(self, node_id: NodeId, riker_exit_code:int, sand _proc, trace_file, stdout, stderr, variable_file = self.commands_currently_executing.pop(node_id) logging.trace(f"ExecutingRemove|{node_id}") # Handle stopped by riker due to network access + logging.debug("RIKER EXIT CODE: " + str(riker_exit_code)) if int(riker_exit_code) == 159: logging.debug(f" > Adding {node_id} to stopped because it tried to access the network.") logging.trace(f"StoppedAdd|{node_id}:network") self.stopped.add(node_id) - trace_object = executor.read_trace(sandbox_dir, trace_file) - cmd_exit_code = trace.parse_exit_code(trace_object) - - ## Save the completed node info. Note that if the node doesn't commit - ## this information will be invalid and rewritten the next time execution - ## is completed for this node. - completed_node_info = CompletedNodeInfo(cmd_exit_code, variable_file, stdout) - self.nodes[node_id].set_completed_info(completed_node_info) - - # Handle any other cmd exit with error - # TODO: for now we just postpone them until we reach the frontier - # afterwards we might want to reattempt to speculate them - if cmd_exit_code != 0 and node_id not in self.frontier: - logging.debug(f" > Adding {node_id} to stopped because it exited with an error.") - logging.trace(f"StoppedAdd|{node_id}:error") - self.stopped.add(node_id) else: + + trace_object = executor.read_trace(sandbox_dir, trace_file) + cmd_exit_code = trace.parse_exit_code(trace_object) + + ## Save the completed node info. Note that if the node doesn't commit + ## this information will be invalid and rewritten the next time execution + ## is completed for this node. + completed_node_info = CompletedNodeInfo(cmd_exit_code, variable_file, stdout) + self.nodes[node_id].set_completed_info(completed_node_info) + + # Handle any other cmd exit with error + # TODO: for now we just postpone them until we reach the frontier + # afterwards we might want to reattempt to speculate them + if cmd_exit_code != 0 and node_id not in self.frontier: + logging.debug(f" > Adding {node_id} to stopped because it exited with an error.") + logging.trace(f"StoppedAdd|{node_id}:error") + self.stopped.add(node_id) + else: - read_set, write_set = trace.parse_and_gather_cmd_rw_sets(trace_object) - rw_set = RWSet(read_set, write_set) - self.update_rw_set(node_id, rw_set) + read_set, write_set = trace.parse_and_gather_cmd_rw_sets(trace_object) + rw_set = RWSet(read_set, write_set) + self.update_rw_set(node_id, rw_set) ## Now that command `node_id` is done executing, we can check which other commands ## can be resolved (that might have finished execution before but where waiting on `node_id`) diff --git a/parallel-orch/template_script_to_execute.sh b/parallel-orch/template_script_to_execute.sh index 9e97a141..4b593cfa 100755 --- a/parallel-orch/template_script_to_execute.sh +++ b/parallel-orch/template_script_to_execute.sh @@ -24,8 +24,14 @@ echo 'declare -p > "$OUTPUT_VARIABLE_FILE"' >> ./Rikerfile ## The (frontier) cmd is run outside a sandbox ## so we want to run and trace everything normally # rkr --no-inject # --frontier -rkr # --frontier -## TODO: Save the exit code here -rkr --debug trace -o "$TRACE_FILE" > /dev/null +if [ $sandbox_flag -eq 1 ]; then + rkr +else + rkr --frontier +fi +exit_code="$?" +rkr --debug trace -o "$TRACE_FILE" > /dev/null pash_redir_output echo "Sandbox ${CMD_ID} Output variables saved in: $OUTPUT_VARIABLE_FILE" + +(exit $exit_code) From 2e5b1c1c2d54fac7593124111d6a1d210d660b88 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Wed, 21 Jun 2023 12:06:45 -0600 Subject: [PATCH 02/34] Add test scripts for network access --- test/misc/sleep_and_curl.sh | 4 ++++ test/test_orch.sh | 23 +++++++++++++++++++++- test/test_scripts/test_network_access_1.sh | 3 +++ test/test_scripts/test_network_access_2.sh | 4 ++++ test/test_scripts/test_network_access_3.sh | 4 ++++ 5 files changed, 37 insertions(+), 1 deletion(-) create mode 100755 test/misc/sleep_and_curl.sh create mode 100644 test/test_scripts/test_network_access_1.sh create mode 100644 test/test_scripts/test_network_access_2.sh create mode 100644 test/test_scripts/test_network_access_3.sh diff --git a/test/misc/sleep_and_curl.sh b/test/misc/sleep_and_curl.sh new file mode 100755 index 00000000..71f15fde --- /dev/null +++ b/test/misc/sleep_and_curl.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +sleep $1 +curl example.com \ No newline at end of file diff --git a/test/test_orch.sh b/test/test_orch.sh index e08255ee..305bae30 100755 --- a/test/test_orch.sh +++ b/test/test_orch.sh @@ -314,13 +314,31 @@ test_break() $shell $2/test_break.sh } +test_network_access_1() +{ + local shell=$1 + $shell $2/test_network_access_1.sh +} + +test_network_access_2() +{ + local shell=$1 + $shell $2/test_network_access_2.sh +} + +test_network_access_3() +{ + local shell=$1 + $shell $2/test_network_access_3.sh +} + ## TODO: make more loop tests with nested loops and commands after the loop # We run all tests composed with && to exit on the first that fails if [ "$#" -eq 0 ]; then run_test test1_1 "1 2 3 1" # 7 run_test test1_2 "1 2 2 1" # 6 - run_test test1_3 "1 2 2 1" # 6 + run_test test1_3 #"1 2 2 1" # 6 run_test test2_1 "1 1 1 1 1 1 1 1 1 1 1" # 10 run_test test2_2 "1 1 1 1 1 1 1 1 1 1 1" # 10 run_test test2_3 "1 1 1 1 1 1 1 1 1 1 1" # 10 @@ -347,6 +365,9 @@ if [ "$#" -eq 0 ]; then run_test test_stdout "1 1 1 1 1 1" # 6 run_test test_loop run_test test_break + run_test test_network_access_1 "1 2 2" + run_test test_network_access_2 "1 2 2 2" + run_test test_network_access_3 "1 2 2 2" else for testname in $@ do diff --git a/test/test_scripts/test_network_access_1.sh b/test/test_scripts/test_network_access_1.sh new file mode 100644 index 00000000..4d52c727 --- /dev/null +++ b/test/test_scripts/test_network_access_1.sh @@ -0,0 +1,3 @@ +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.7 "foo" "$test_output_dir/in1" "$test_output_dir/out1" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.4 "foo" "$test_output_dir/out1" "$test_output_dir/out2" +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.1 diff --git a/test/test_scripts/test_network_access_2.sh b/test/test_scripts/test_network_access_2.sh new file mode 100644 index 00000000..534207e2 --- /dev/null +++ b/test/test_scripts/test_network_access_2.sh @@ -0,0 +1,4 @@ +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.1 +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.5 +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.9 +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 1.3 diff --git a/test/test_scripts/test_network_access_3.sh b/test/test_scripts/test_network_access_3.sh new file mode 100644 index 00000000..31f58e08 --- /dev/null +++ b/test/test_scripts/test_network_access_3.sh @@ -0,0 +1,4 @@ +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 1.3 +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.9 +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.5 +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0 From 57a5f008e697f796d95408fe2ec62f902b597388 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Wed, 21 Jun 2023 12:16:14 -0600 Subject: [PATCH 03/34] Fetch new riker and try changes --- deps/riker | 2 +- deps/try | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/riker b/deps/riker index ec1a09fc..41f2d402 160000 --- a/deps/riker +++ b/deps/riker @@ -1 +1 @@ -Subproject commit ec1a09fc139fc3a8f78b4a6389d9117fab75bb79 +Subproject commit 41f2d402baab204e0ada33e8fe316bbb213f99ad diff --git a/deps/try b/deps/try index 933d7ae4..37bbf7da 160000 --- a/deps/try +++ b/deps/try @@ -1 +1 @@ -Subproject commit 933d7ae409e113697a38201017a89c76b56e0f01 +Subproject commit 37bbf7da5bfde97f598c3327c9582d9b08d7e264 From 2cfcad2237c600693b9350cf59911f31d3e8fd57 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Wed, 21 Jun 2023 12:52:23 -0600 Subject: [PATCH 04/34] Don't move non-zero ec commands to stopped --- parallel-orch/partial_program_order.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 832c9850..46469d91 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -1223,7 +1223,6 @@ def command_execution_completed(self, node_id: NodeId, riker_exit_code:int, sand _proc, trace_file, stdout, stderr, variable_file = self.commands_currently_executing.pop(node_id) logging.trace(f"ExecutingRemove|{node_id}") # Handle stopped by riker due to network access - logging.debug("RIKER EXIT CODE: " + str(riker_exit_code)) if int(riker_exit_code) == 159: logging.debug(f" > Adding {node_id} to stopped because it tried to access the network.") logging.trace(f"StoppedAdd|{node_id}:network") @@ -1239,18 +1238,13 @@ def command_execution_completed(self, node_id: NodeId, riker_exit_code:int, sand completed_node_info = CompletedNodeInfo(cmd_exit_code, variable_file, stdout) self.nodes[node_id].set_completed_info(completed_node_info) - # Handle any other cmd exit with error - # TODO: for now we just postpone them until we reach the frontier - # afterwards we might want to reattempt to speculate them - if cmd_exit_code != 0 and node_id not in self.frontier: - logging.debug(f" > Adding {node_id} to stopped because it exited with an error.") - logging.trace(f"StoppedAdd|{node_id}:error") - self.stopped.add(node_id) - else: + ## We no longer add failed commands to the stopped set, + ## because this leads to more repetitions than needed + ## and does not allow us to properly speculate commands - read_set, write_set = trace.parse_and_gather_cmd_rw_sets(trace_object) - rw_set = RWSet(read_set, write_set) - self.update_rw_set(node_id, rw_set) + read_set, write_set = trace.parse_and_gather_cmd_rw_sets(trace_object) + rw_set = RWSet(read_set, write_set) + self.update_rw_set(node_id, rw_set) ## Now that command `node_id` is done executing, we can check which other commands ## can be resolved (that might have finished execution before but where waiting on `node_id`) From e7189530fb802cd14fb10d6eb45e6ab58090f8d3 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Wed, 21 Jun 2023 12:54:46 -0600 Subject: [PATCH 05/34] Use most recent Riker branch --- .gitmodules | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitmodules b/.gitmodules index 297113f8..7d2feab2 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,7 +1,7 @@ [submodule "deps/riker"] path = deps/riker url = https://github.com/angelhof/riker.git - branch = dyn-par-investigation + branch = eric-custom-db-store [submodule "deps/pash"] path = deps/pash url = https://github.com/binpash/pash.git From 70efdd21c5e544780e9b2bdbc3c969ef15f1d6ab Mon Sep 17 00:00:00 2001 From: gliargovas Date: Mon, 26 Jun 2023 09:32:42 -0600 Subject: [PATCH 06/34] Remove old committed from step_forward() --- parallel-orch/partial_program_order.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 46469d91..6fae0e8c 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -367,7 +367,7 @@ def init_partial_order(self): ## Initialize the workset self.init_workset() logging.debug(f'Initialized workset') - self.populate_to_be_resolved_dict([]) + self.populate_to_be_resolved_dict() logging.debug(f'To be resolved sets per node:') logging.debug(self.to_be_resolved) logging.info(f'Initialized the partial order!') @@ -698,7 +698,7 @@ def __resolve_dependencies_continuous_and_move_frontier(self, cmds_to_resolve): # We want stopped commands to not enter the workset again yet assert(set(self.workset).isdisjoint(self.stopped)) - self.step_forward(old_committed) + self.step_forward() # self.log_partial_program_order_info() return set(self.get_committed()) - old_committed @@ -1028,7 +1028,7 @@ def unroll_loop_node(self, target_concrete_node_id: NodeId): ## TODO: This needs to change when we modify unrolling to happen speculatively too ## TODO: This needs to properly add the node to frontier and to resolve dictionary - self.step_forward(self.get_committed()) + self.step_forward() self.frontier.append(new_first_node_id) ## At the end of unrolling the target node must be part of the PO @@ -1056,10 +1056,10 @@ def maybe_unroll(self, node_id: NodeId) -> NodeId: ## All top-level functions should get minimal arguments (none if possible) ## and should just get their relevant state from the fields of the PO. ## TODO: step_forward seems to be an internal function - def step_forward(self, old_committed): + def step_forward(self): self.frontier_commit_and_push() self.rerun_stopped() - self.populate_to_be_resolved_dict(old_committed) + self.populate_to_be_resolved_dict() ## Pushes the frontier forward as much as possible for all commands in it that can be committed def frontier_commit_and_push(self): @@ -1302,7 +1302,7 @@ def log_partial_program_order_info(self): logging.debug(f"=" * 80) ## TODO: Document how this finds the to be resolved dict - def populate_to_be_resolved_dict(self, old_committed): + def populate_to_be_resolved_dict(self): logging.debug("Populating the resolved dictionary for all nodes") for node_id in self.nodes: if self.is_committed(node_id): @@ -1321,10 +1321,6 @@ def populate_to_be_resolved_dict(self, old_committed): logging.debug(f" > Node: {node_id} is not executing or waiting to be resolved so we modify its set.") self.to_be_resolved[node_id] = [] traversal = [] - ## KK 2023-04-24: Previously old_committed was used here - ## but this doesn't make sense because we are only modifying - ## the to_be_resolved of currently executing commands. - # relevant_committed = old_committed relevant_committed = self.get_committed() if node_id not in relevant_committed: to_add = self.get_prev(node_id).copy() From b9a3d5f9a226ca70747dd9581d53c7918720e7f1 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Wed, 5 Jul 2023 05:54:53 -0600 Subject: [PATCH 07/34] Remove frontier reference from rerun_stopped() --- parallel-orch/partial_program_order.py | 33 +++++++++++++++----------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 6fae0e8c..072a902f 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -702,19 +702,6 @@ def __resolve_dependencies_continuous_and_move_frontier(self, cmds_to_resolve): # self.log_partial_program_order_info() return set(self.get_committed()) - old_committed - def rerun_stopped(self): - new_stopped = self.stopped.copy() - ## We never remove stopped commands that are unsafe - ## from the stopped set to be reexecuted. - for cmd_id in self.get_stopped_safe(): - if cmd_id in self.frontier: - self.workset.append(cmd_id) - logging.debug(f"Removing {cmd_id} from stopped") - logging.trace(f"StoppedRemove|{cmd_id}") - new_stopped.remove(cmd_id) - # We remove any to-check-for-dependency nodes as the stopped node will execute in frontier - self.to_be_resolved[cmd_id] = [] - self.stopped = new_stopped ## This method checks if nid1 would be before nid2 if nid2 was part of the PO. ## @@ -1132,6 +1119,24 @@ def has_forward_dependency(self, first_id, second_id): else: logging.debug(f' > No dependencies') return False + + def is_next_non_committed_node(self, node_id: NodeId) -> bool: + # We want the predecessor to be committed and the current node to not be committed + return self.is_committed(self.get_prev(node_id)) and not self.is_committed(node_id) + + def rerun_stopped(self): + new_stopped = self.stopped.copy() + ## We never remove stopped commands that are unsafe + ## from the stopped set to be reexecuted. + for cmd_id in self.get_stopped_safe(): + if self.is_next_non_committed_node(cmd_id): + self.workset.append(cmd_id) + logging.debug(f"Removing {cmd_id} from stopped") + logging.trace(f"StoppedRemove|{cmd_id}") + new_stopped.remove(cmd_id) + # We remove any to-check-for-dependency nodes as the stopped node will execute in frontier + self.to_be_resolved[cmd_id] = [] + self.stopped = new_stopped ## TODO: Eventually, in the future, let's add here some form of limit def schedule_work(self, limit=0): @@ -1318,7 +1323,7 @@ def populate_to_be_resolved_dict(self): logging.debug(f" > Node: {node_id} is currently executing, skipping...") continue else: - logging.debug(f" > Node: {node_id} is not executing or waiting to be resolved so we modify its set.") + logging.debug(f" > Node: {node_id} is not executing or waiting to be resolved (speculated) so we modify its set.") self.to_be_resolved[node_id] = [] traversal = [] relevant_committed = self.get_committed() From 1a2afcb2b9a162173e88d397d2185490012f3ff8 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Wed, 5 Jul 2023 06:01:58 -0600 Subject: [PATCH 08/34] Check for all node predecessors --- parallel-orch/partial_program_order.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 072a902f..e1f1d90a 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -1122,7 +1122,10 @@ def has_forward_dependency(self, first_id, second_id): def is_next_non_committed_node(self, node_id: NodeId) -> bool: # We want the predecessor to be committed and the current node to not be committed - return self.is_committed(self.get_prev(node_id)) and not self.is_committed(node_id) + for prev_node in self.get_prev(node_id): + if not (self.is_committed(prev_node) and not self.is_committed(node_id)): + return False + return True def rerun_stopped(self): new_stopped = self.stopped.copy() From 357d9999af1b658fee6171cc958a6f5f705ca0da Mon Sep 17 00:00:00 2001 From: gliargovas Date: Wed, 5 Jul 2023 06:06:08 -0600 Subject: [PATCH 09/34] Move rerun_stopped() to schedule_work() --- parallel-orch/partial_program_order.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index e1f1d90a..3360654e 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -1045,7 +1045,6 @@ def maybe_unroll(self, node_id: NodeId) -> NodeId: ## TODO: step_forward seems to be an internal function def step_forward(self): self.frontier_commit_and_push() - self.rerun_stopped() self.populate_to_be_resolved_dict() ## Pushes the frontier forward as much as possible for all commands in it that can be committed @@ -1144,7 +1143,10 @@ def rerun_stopped(self): ## TODO: Eventually, in the future, let's add here some form of limit def schedule_work(self, limit=0): # self.log_partial_program_order_info() + logging.debug("Scheduling work...") + logging.debug("Rerunning stopped commands") + self.rerun_stopped() ## KK 2023-05-04 Is it a problem if we do that here? # self.step_forward(copy.deepcopy(self.committed)) From cb0144fdefd511e57892bdd0b081edf0bb449f1e Mon Sep 17 00:00:00 2001 From: gliargovas Date: Wed, 5 Jul 2023 06:58:43 -0600 Subject: [PATCH 10/34] Remove redundant stopped frontier cmd check --- parallel-orch/partial_program_order.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 3360654e..00003c12 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -1150,8 +1150,6 @@ def schedule_work(self, limit=0): ## KK 2023-05-04 Is it a problem if we do that here? # self.step_forward(copy.deepcopy(self.committed)) - ## TODO: Move loop unrolling here for speculation too - self.run_all_frontier_cmds() self.schedule_all_workset_non_frontier_cmds() assert(self.valid()) @@ -1171,14 +1169,6 @@ def run_all_frontier_cmds(self): for cmd_id in cmd_ids: # If frontier cmd is still executing, don't re-execute it if not cmd_id in self.commands_currently_executing: - # We also re-execute stopped frontier cmds, - # therefore, they are no longer stopped - logging.debug(f" Removing {cmd_id} from stopped") - if cmd_id in self.stopped: - self.stopped.remove(cmd_id) - logging.trace(f"StoppedRemove|{cmd_id}") - # We remove any to-check-for-dependency nodes as the stopped node will execute in frontier - self.to_be_resolved[cmd_id] = [] self.run_cmd_non_blocking(cmd_id) ## Run a command and add it to the dictionary of executing ones From 4eb08a382280be7654c1d590ead7eceb3b29571d Mon Sep 17 00:00:00 2001 From: gliargovas Date: Wed, 5 Jul 2023 07:47:03 -0600 Subject: [PATCH 11/34] Schedule commands one at a time instead of altogether --- parallel-orch/partial_program_order.py | 57 ++++++++++++++++++-------- 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 00003c12..77937600 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -1146,30 +1146,53 @@ def schedule_work(self, limit=0): logging.debug("Scheduling work...") logging.debug("Rerunning stopped commands") + #TODO: move this to schedule_node() at a later time self.rerun_stopped() ## KK 2023-05-04 Is it a problem if we do that here? # self.step_forward(copy.deepcopy(self.committed)) - self.run_all_frontier_cmds() - self.schedule_all_workset_non_frontier_cmds() + ## TODO: Move loop unrolling here for speculation too + + for cmd_id in self.get_workset(): + # We only need to schedule non-committed and non-executing nodes + if cmd_id in self.get_committed() or \ + cmd_id in self.commands_currently_executing: + continue + else: + self.schedule_node(cmd_id) + + + # self.run_all_frontier_cmds() + # self.schedule_all_workset_non_frontier_cmds() assert(self.valid()) - def schedule_all_workset_non_frontier_cmds(self): - non_frontier_ids = [node_id for node_id in self.get_workset() - if not self.is_frontier(node_id)] - for cmd_id in non_frontier_ids: - # We also need for a cmd to not be waiting to be resolved. - if not cmd_id in self.commands_currently_executing and \ - not cmd_id in self.speculated: + # Nodes to be scheduled are always not committed and not executing + def schedule_node(self, cmd_id): + # This replaced the old frontier check + if self.is_next_non_committed_node(cmd_id): + # TODO: run this and before committing kill any speculated commands still executing + self.run_cmd_non_blocking(cmd_id) + else: + if not cmd_id in self.speculated: self.speculate_cmd_non_blocking(cmd_id) - - def run_all_frontier_cmds(self): - logging.debug("Starting execution on the whole frontier") - cmd_ids = self.get_frontier() - for cmd_id in cmd_ids: - # If frontier cmd is still executing, don't re-execute it - if not cmd_id in self.commands_currently_executing: - self.run_cmd_non_blocking(cmd_id) + return + + # def schedule_all_workset_non_frontier_cmds(self): + # non_frontier_ids = [node_id for node_id in self.get_workset() + # if not self.is_frontier(node_id)] + # for cmd_id in non_frontier_ids: + # # We also need for a cmd to not be waiting to be resolved. + # if not cmd_id in self.commands_currently_executing and \ + # not cmd_id in self.speculated: + # self.speculate_cmd_non_blocking(cmd_id) + + # def run_all_frontier_cmds(self): + # logging.debug("Starting execution on the whole frontier") + # cmd_ids = self.get_frontier() + # for cmd_id in cmd_ids: + # # If frontier cmd is still executing, don't re-execute it + # if not cmd_id in self.commands_currently_executing: + # self.run_cmd_non_blocking(cmd_id) ## Run a command and add it to the dictionary of executing ones def run_cmd_non_blocking(self, node_id: NodeId): From 714a36289597e6097aea3bdf37d5a5c9317b190a Mon Sep 17 00:00:00 2001 From: gliargovas Date: Wed, 5 Jul 2023 07:51:26 -0600 Subject: [PATCH 12/34] Remove redundant scheduling functions --- parallel-orch/partial_program_order.py | 31 ++++---------------------- 1 file changed, 4 insertions(+), 27 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 77937600..a96354bc 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -1145,25 +1145,19 @@ def schedule_work(self, limit=0): # self.log_partial_program_order_info() logging.debug("Scheduling work...") - logging.debug("Rerunning stopped commands") + logging.debug("Rerunning stopped command...") #TODO: move this to schedule_node() at a later time self.rerun_stopped() ## KK 2023-05-04 Is it a problem if we do that here? # self.step_forward(copy.deepcopy(self.committed)) ## TODO: Move loop unrolling here for speculation too - + for cmd_id in self.get_workset(): # We only need to schedule non-committed and non-executing nodes - if cmd_id in self.get_committed() or \ - cmd_id in self.commands_currently_executing: - continue - else: + if not (cmd_id in self.get_committed() or \ + cmd_id in self.commands_currently_executing): self.schedule_node(cmd_id) - - - # self.run_all_frontier_cmds() - # self.schedule_all_workset_non_frontier_cmds() assert(self.valid()) # Nodes to be scheduled are always not committed and not executing @@ -1177,23 +1171,6 @@ def schedule_node(self, cmd_id): self.speculate_cmd_non_blocking(cmd_id) return - # def schedule_all_workset_non_frontier_cmds(self): - # non_frontier_ids = [node_id for node_id in self.get_workset() - # if not self.is_frontier(node_id)] - # for cmd_id in non_frontier_ids: - # # We also need for a cmd to not be waiting to be resolved. - # if not cmd_id in self.commands_currently_executing and \ - # not cmd_id in self.speculated: - # self.speculate_cmd_non_blocking(cmd_id) - - # def run_all_frontier_cmds(self): - # logging.debug("Starting execution on the whole frontier") - # cmd_ids = self.get_frontier() - # for cmd_id in cmd_ids: - # # If frontier cmd is still executing, don't re-execute it - # if not cmd_id in self.commands_currently_executing: - # self.run_cmd_non_blocking(cmd_id) - ## Run a command and add it to the dictionary of executing ones def run_cmd_non_blocking(self, node_id: NodeId): ## A command should only be run if it's in the frontier, otherwise it should be spec run From 183f98c45bd4761fc3df414315d06b0801d8ca84 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Sat, 8 Jul 2023 03:11:51 -0600 Subject: [PATCH 13/34] Separate internal and external functions of step_forward() and remove method --- parallel-orch/partial_program_order.py | 57 ++++++++++---------------- 1 file changed, 22 insertions(+), 35 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index a96354bc..da247265 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -616,7 +616,7 @@ def cmd_can_be_resolved(self, node_id: int) -> bool: logging.debug(f' >> Able to resolve {node_id}') return True - def resolve_commands_that_can_be_resolved_and_step_forward(self): + def resolve_commands_that_can_be_resolved_and_push_frontier(self): cmds_to_resolve = self.__pop_cmds_to_resolve_from_speculated() logging.debug(f"Commands to check for dependencies this round are: {sorted(cmds_to_resolve)}") logging.debug(f"Commands that cannot be resolved this round are: {sorted(self.speculated)}") @@ -698,7 +698,7 @@ def __resolve_dependencies_continuous_and_move_frontier(self, cmds_to_resolve): # We want stopped commands to not enter the workset again yet assert(set(self.workset).isdisjoint(self.stopped)) - self.step_forward() + self.__frontier_commit_and_push() # self.log_partial_program_order_info() return set(self.get_committed()) - old_committed @@ -831,7 +831,7 @@ def progress_po_due_to_wait(self, node_id: NodeId): ## We check if something can be resolved and stepped forward here ## KK 2023-05-10 This seems to work for all tests (so it might be idempotent ## since in many tests there is nothing new to resolve after a wait) - self.resolve_commands_that_can_be_resolved_and_step_forward() + self.resolve_commands_that_can_be_resolved_and_push_frontier() ## When the frontend sends a wait for a node, it means that execution in the frontend has ## already surpassed all nodes prior to it. This is particularly important for loops, @@ -1015,7 +1015,9 @@ def unroll_loop_node(self, target_concrete_node_id: NodeId): ## TODO: This needs to change when we modify unrolling to happen speculatively too ## TODO: This needs to properly add the node to frontier and to resolve dictionary - self.step_forward() + + # GL 2023-05-22: __frontier_commit_and_push() should be called here instead of step_forward() + # Although without it the test cases pass self.frontier.append(new_first_node_id) ## At the end of unrolling the target node must be part of the PO @@ -1030,25 +1032,11 @@ def maybe_unroll(self, node_id: NodeId) -> NodeId: ## The node_id must be part of the PO after unrolling, otherwise we did something wrong assert(self.is_node_id(node_id)) - ## KK 2023-09-05 @Giorgo Do all of these steps need to be done at once, or are these methods - ## meaningful even if called one by one? In general, I would like there to - ## be a clear set of 1-3 methods that are supposed to be used whenever we - ## add some new nodes (or progress the PO in some way) that will step it properly, - ## while being idempotent (if they are called multiple times nothing goes wrong). - ## - ## Internal functions on the other hand (ones that cannot be called on their own - ## since they might leave the PO in a partial state) should be prefixed with an - ## underscore. - ## - ## All top-level functions should get minimal arguments (none if possible) - ## and should just get their relevant state from the fields of the PO. - ## TODO: step_forward seems to be an internal function - def step_forward(self): - self.frontier_commit_and_push() - self.populate_to_be_resolved_dict() ## Pushes the frontier forward as much as possible for all commands in it that can be committed - def frontier_commit_and_push(self): + ## This function is not safe to call on its own, since it might leave the PO in a partial state + ## It should be called right after + def __frontier_commit_and_push(self): logging.debug(" > Commiting and pushing frontier") logging.debug(f' > Frontier: {self.frontier}') changes_in_frontier = True @@ -1126,31 +1114,31 @@ def is_next_non_committed_node(self, node_id: NodeId) -> bool: return False return True - def rerun_stopped(self): + # This command never leaves the partial order at a broken state + # It is always safe to call it + def attempt_move_stopped_to_workset(self): new_stopped = self.stopped.copy() ## We never remove stopped commands that are unsafe - ## from the stopped set to be reexecuted. + ## from the stopped set to be reexecuted. for cmd_id in self.get_stopped_safe(): if self.is_next_non_committed_node(cmd_id): self.workset.append(cmd_id) - logging.debug(f"Removing {cmd_id} from stopped") - logging.trace(f"StoppedRemove|{cmd_id}") + logging.debug(f"StoppedRemove|{cmd_id}") new_stopped.remove(cmd_id) - # We remove any to-check-for-dependency nodes as the stopped node will execute in frontier self.to_be_resolved[cmd_id] = [] self.stopped = new_stopped ## TODO: Eventually, in the future, let's add here some form of limit def schedule_work(self, limit=0): # self.log_partial_program_order_info() - logging.debug("Scheduling work...") - logging.debug("Rerunning stopped command...") - #TODO: move this to schedule_node() at a later time - self.rerun_stopped() - ## KK 2023-05-04 Is it a problem if we do that here? - # self.step_forward(copy.deepcopy(self.committed)) - + logging.debug("Rerunning stopped commands") + # attempt_move_stopped_to_workset() needs to happen before the node execution + self.attempt_move_stopped_to_workset() + ## GL 2023-07-05 populate_to_be_resolved_dict() is OK to call anywhere, + ## __frontier_commit_and_push() is not safe to call here + self.populate_to_be_resolved_dict() + ## TODO: Move loop unrolling here for speculation too for cmd_id in self.get_workset(): @@ -1174,7 +1162,6 @@ def schedule_node(self, cmd_id): ## Run a command and add it to the dictionary of executing ones def run_cmd_non_blocking(self, node_id: NodeId): ## A command should only be run if it's in the frontier, otherwise it should be spec run - assert(self.is_frontier(node_id)) logging.trace(f'Running command: {node_id} {self.get_node(node_id)}') logging.trace(f"ExecutingAdd|{node_id}") self.execute_cmd_core(node_id, speculate=False) @@ -1264,7 +1251,7 @@ def command_execution_completed(self, node_id: NodeId, riker_exit_code:int, sand self.add_to_speculated(node_id) ## We can now call the general resolution method that determines which commands ## can be resolved (all their dependencies are done executing), and resolves them. - self.resolve_commands_that_can_be_resolved_and_step_forward() + self.resolve_commands_that_can_be_resolved_and_push_frontier() assert(self.valid()) def print_cmd_stderr(self, stderr): From 54d66794393566351e338dc9f300a05f87fd9860 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Sat, 8 Jul 2023 03:50:41 -0600 Subject: [PATCH 14/34] Remove redundant frontier references --- parallel-orch/partial_program_order.py | 31 +++++++++----------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index da247265..083b20fb 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -406,9 +406,6 @@ def get_committed_list(self) -> list: def is_committed(self, node_id: NodeId) -> bool: return node_id in self.committed - def get_frontier(self) -> list: - return sorted(list(self.frontier)) - def init_inverse_adjacency(self): self.inverse_adjacency = {i: [] for i in self.nodes.keys()} for from_id, to_ids in self.adjacency.items(): @@ -429,8 +426,6 @@ def valid(self): ## TODO: Fix the checks below because they do not work currently ## TODO: Check that committed is prefix closed w.r.t partial order - # self.all_frontier_nodes_after_committed_nodes() - # self.frontier_and_committed_intersect() return valid1 and valid2 ## Checks if loop nodes are all valid, i.e., that there are no loop nodes handled like normal ones, @@ -438,7 +433,8 @@ def valid(self): ## ## Note that loop nodes can be in the committed set (after we are done executing all iterations of a loop) def loop_nodes_valid(self): - forbidden_sets = self.get_frontier() + \ + # GL 2023-07-08: This works without get_all_next_non_committed_nodes(), not sure why + forbidden_sets = self.get_all_next_non_committed_nodes() + \ self.get_workset() + \ list(self.stopped) + \ list(self.commands_currently_executing.keys()) @@ -446,16 +442,6 @@ def loop_nodes_valid(self): if self.is_loop_node(node_id)] return len(loop_nodes_in_forbidden_sets) == 0 - # Check if all frontier nodes are after committed nodes - def all_frontier_nodes_after_committed_nodes(self): - ## TODO: Make this check a proper predecessor check - # return max(self.get_committed()) < min(self.frontier) - return False - - # Checks if frontier and committed intersect - def frontier_and_committed_intersect(self): - return len(set.intersection(set(self.get_committed()), set(self.get_frontier()))) > 0 - def __len__(self): return len(self.nodes) @@ -560,9 +546,6 @@ def get_transitive_closure_if_can_be_resolved(self, can_be_resolved: list, targe all_next_transitive = all_next_transitive.union(successors) next_work.extend(new_next) return list(all_next_transitive) - - def is_frontier(self, node_id: NodeId) -> bool: - return node_id in self.frontier def update_rw_set(self, node_id, rw_set): self.rw_sets[node_id] = rw_set @@ -1034,7 +1017,7 @@ def maybe_unroll(self, node_id: NodeId) -> NodeId: ## Pushes the frontier forward as much as possible for all commands in it that can be committed - ## This function is not safe to call on its own, since it might leave the PO in a partial state + ## This function is not safe to call on its own, since it might leave the PO in a broken state ## It should be called right after def __frontier_commit_and_push(self): logging.debug(" > Commiting and pushing frontier") @@ -1107,6 +1090,13 @@ def has_forward_dependency(self, first_id, second_id): logging.debug(f' > No dependencies') return False + def get_all_next_non_committed_nodes(self) -> "list[NodeId]": + next_non_committed_nodes = [] + for cmd_id in self.get_all_non_committed(): + if cmd_id in self.workset and self.is_next_non_committed_node(cmd_id): + next_non_committed_nodes.append(cmd_id) + return next_non_committed_nodes + def is_next_non_committed_node(self, node_id: NodeId) -> bool: # We want the predecessor to be committed and the current node to not be committed for prev_node in self.get_prev(node_id): @@ -1279,7 +1269,6 @@ def log_partial_program_order_info(self): logging.debug(f"=" * 80) logging.debug(f"WORKSET: {self.get_workset()}") logging.debug(f"COMMITTED: {self.get_committed_list()}") - logging.debug(f"FRONTIER: {self.get_frontier()}") logging.debug(f"EXECUTING: {list(self.commands_currently_executing.keys())}") logging.debug(f"STOPPED: {list(self.stopped)}") logging.debug(f" of which UNSAFE: {list(self.get_unsafe())}") From c2f47b1116c8b96936fd94b4af73a8af248a2ad4 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Sat, 8 Jul 2023 04:49:00 -0600 Subject: [PATCH 15/34] Run every command in overlay (even frontier) --- parallel-orch/executor.py | 8 ++------ parallel-orch/run_command.sh | 18 +++++++----------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/parallel-orch/executor.py b/parallel-orch/executor.py index 0f99d8d2..91e8afb5 100644 --- a/parallel-orch/executor.py +++ b/parallel-orch/executor.py @@ -6,14 +6,12 @@ # This module executes a sequence of commands # and traces them with Riker. -# Commands [1:N] are run inside an overlay sandbox. +# All commands are run inside an overlay sandbox. def async_run_and_trace_command_return_trace(command, node_id, sandbox_mode=False): trace_file = util.ptempfile() - ## KK 2023-04-24: @giorgo Is there a reason you used tempfile.NamedTemporaryFile and not util.ptempfile()? - stdout_file = tempfile.NamedTemporaryFile(dir=config.PASH_SPEC_TMP_PREFIX) stdout_file = util.ptempfile() - stderr_file = tempfile.NamedTemporaryFile(dir=config.PASH_SPEC_TMP_PREFIX) + stderr_file = util.ptempfile() variable_file = util.ptempfile() logging.debug(f'Scheduler: Stdout file for: {node_id} is: {stdout_file}') logging.debug(f'Scheduler: Stderr file for: {node_id} is: {stderr_file}') @@ -30,10 +28,8 @@ def async_run_and_trace_command(command, trace_file, node_id, stdout_file, stder run_script = f'{config.PASH_SPEC_TOP}/parallel-orch/run_command.sh' args = ["/bin/bash", run_script, command, trace_file, stdout_file, variable_file] if sandbox_mode: - # print(" -- Sandbox mode") args.append("sandbox") else: - # print(" -- Standard mode") args.append("standard") args.append(str(node_id)) # Save output to temporary files to not saturate the memory diff --git a/parallel-orch/run_command.sh b/parallel-orch/run_command.sh index 7bd5dbe8..0b4b5acd 100755 --- a/parallel-orch/run_command.sh +++ b/parallel-orch/run_command.sh @@ -22,17 +22,13 @@ fi # echo "Execution mode: $EXEC_MODE" -if [ $sandbox_flag -eq 1 ]; then - ## Generate a temporary directory to store the workfiles - mkdir -p /tmp/pash_spec - export SANDBOX_DIR="$(mktemp -d /tmp/pash_spec/sandbox_XXXXXXX)/" - ## We need to execute `try` with bash to keep the exported functions - bash "${PASH_SPEC_TOP}/deps/try/try" -D "${SANDBOX_DIR}" "${PASH_SPEC_TOP}/parallel-orch/template_script_to_execute.sh" > "${STDOUT_FILE}" - exit_code=$? -else - "${PASH_SPEC_TOP}/parallel-orch/template_script_to_execute.sh" > "${STDOUT_FILE}" - exit_code=$? -fi + +## Generate a temporary directory to store the workfiles +mkdir -p /tmp/pash_spec +export SANDBOX_DIR="$(mktemp -d /tmp/pash_spec/sandbox_XXXXXXX)/" +## We need to execute `try` with bash to keep the exported functions +bash "${PASH_SPEC_TOP}/deps/try/try" -D "${SANDBOX_DIR}" "${PASH_SPEC_TOP}/parallel-orch/template_script_to_execute.sh" > "${STDOUT_FILE}" +exit_code=$? ## Only used for debugging # ls -R "${SANDBOX_DIR}/upperdir" 1>&2 From 375ea2fd0f3b28ef7909865dd61c6b18ddc34325 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Sat, 8 Jul 2023 06:08:43 -0600 Subject: [PATCH 16/34] Change frontier flag to speculate flag --- parallel-orch/executor.py | 14 +++++++------- parallel-orch/partial_program_order.py | 2 +- parallel-orch/run_command.sh | 10 ++++------ parallel-orch/template_script_to_execute.sh | 10 +++------- 4 files changed, 15 insertions(+), 21 deletions(-) diff --git a/parallel-orch/executor.py b/parallel-orch/executor.py index 91e8afb5..52f56f05 100644 --- a/parallel-orch/executor.py +++ b/parallel-orch/executor.py @@ -8,7 +8,7 @@ # and traces them with Riker. # All commands are run inside an overlay sandbox. -def async_run_and_trace_command_return_trace(command, node_id, sandbox_mode=False): +def async_run_and_trace_command_return_trace(command, node_id, speculate_mode=False): trace_file = util.ptempfile() stdout_file = util.ptempfile() stderr_file = util.ptempfile() @@ -16,19 +16,19 @@ def async_run_and_trace_command_return_trace(command, node_id, sandbox_mode=Fals logging.debug(f'Scheduler: Stdout file for: {node_id} is: {stdout_file}') logging.debug(f'Scheduler: Stderr file for: {node_id} is: {stderr_file}') logging.debug(f'Scheduler: Output variable file for: {node_id} is: {variable_file}') - process = async_run_and_trace_command(command, trace_file, node_id, stdout_file, stderr_file, variable_file, sandbox_mode) + process = async_run_and_trace_command_return_trace_in_sandbox(command, trace_file, node_id, stdout_file, stderr_file, variable_file, speculate_mode) return process, trace_file, stdout_file, stderr_file, variable_file -def async_run_and_trace_command_return_trace_in_sandbox(command, node_id): - process, trace_file, stdout_file, stderr_file, variable_file = async_run_and_trace_command_return_trace(command, node_id, sandbox_mode=True) +def async_run_and_trace_command_return_trace_in_sandbox_speculate(command, node_id): + process, trace_file, stdout_file, stderr_file, variable_file = async_run_and_trace_command_return_trace(command, node_id, speculate_mode=True) return process, trace_file, stdout_file, stderr_file, variable_file -def async_run_and_trace_command(command, trace_file, node_id, stdout_file, stderr_file, variable_file, sandbox_mode=False): +def async_run_and_trace_command_return_trace_in_sandbox(command, trace_file, node_id, stdout_file, stderr_file, variable_file, speculate_mode=False): ## Call Riker to execute the command run_script = f'{config.PASH_SPEC_TOP}/parallel-orch/run_command.sh' args = ["/bin/bash", run_script, command, trace_file, stdout_file, variable_file] - if sandbox_mode: - args.append("sandbox") + if speculate_mode: + args.append("speculate") else: args.append("standard") args.append(str(node_id)) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 083b20fb..76e834ad 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -1186,7 +1186,7 @@ def execute_cmd_core(self, node_id: NodeId, speculate=False): cmd = node.get_cmd() self.executions[node_id] += 1 if speculate: - execute_func = executor.async_run_and_trace_command_return_trace_in_sandbox + execute_func = executor.async_run_and_trace_command_return_trace_in_sandbox_speculate else: execute_func = executor.async_run_and_trace_command_return_trace proc, trace_file, stdout, stderr, variable_file = execute_func(cmd, node_id) diff --git a/parallel-orch/run_command.sh b/parallel-orch/run_command.sh index 0b4b5acd..5fbe093e 100755 --- a/parallel-orch/run_command.sh +++ b/parallel-orch/run_command.sh @@ -9,20 +9,18 @@ export EXEC_MODE=${5?No execution mode given} export CMD_ID=${6?No command id given} ## KK 2023-04-24: Not sure this should be run every time we run a command +## GL 2023-07-08: Tests seem to pass without it source "$PASH_TOP/compiler/orchestrator_runtime/speculative/pash_spec_init_setup.sh" -if [ "sandbox" == "$EXEC_MODE" ]; then - export sandbox_flag=1 +if [ "speculate" == "$EXEC_MODE" ]; then + export speculate_flag=1 elif [ "standard" == "$EXEC_MODE" ]; then - export sandbox_flag=0 + export speculate_flag=0 else echo "$$: Unknown value ${EXEC_MODE} for execution mode" 1>&2 exit 1 fi -# echo "Execution mode: $EXEC_MODE" - - ## Generate a temporary directory to store the workfiles mkdir -p /tmp/pash_spec export SANDBOX_DIR="$(mktemp -d /tmp/pash_spec/sandbox_XXXXXXX)/" diff --git a/parallel-orch/template_script_to_execute.sh b/parallel-orch/template_script_to_execute.sh index 4b593cfa..91fc4dee 100755 --- a/parallel-orch/template_script_to_execute.sh +++ b/parallel-orch/template_script_to_execute.sh @@ -1,6 +1,6 @@ #!/bin/bash -## TODO: Pass frontier flag here instead of separate scripts +## TODO: Pass speculate flag here instead of separate scripts ## Clean up the riker directory ## KK 2023-05-04 should this be done somewhere else? Could this interfere with overlay fs? @@ -19,12 +19,8 @@ echo $CMD_STRING > ./Rikerfile # TODO: There is a bug here and parsing of RW dependencies doesn't really work # when we add the following line. echo 'declare -p > "$OUTPUT_VARIABLE_FILE"' >> ./Rikerfile -# echo 'cat "$OUTPUT_VARIABLE_FILE"' >> ./Rikerfile -# cat Rikerfile -## The (frontier) cmd is run outside a sandbox -## so we want to run and trace everything normally -# rkr --no-inject # --frontier -if [ $sandbox_flag -eq 1 ]; then + +if [ $speculate_flag -eq 1 ]; then rkr else rkr --frontier From 5430e1a06ef057666abee72c53ee3f12b1bf9207 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Sat, 8 Jul 2023 12:15:31 -0600 Subject: [PATCH 17/34] Kill and restart nodes still running on commit --- parallel-orch/executor.py | 2 ++ parallel-orch/partial_program_order.py | 40 ++++++++++++++------------ parallel-orch/scheduler_server.py | 1 - 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/parallel-orch/executor.py b/parallel-orch/executor.py index 52f56f05..4d5335d2 100644 --- a/parallel-orch/executor.py +++ b/parallel-orch/executor.py @@ -16,6 +16,7 @@ def async_run_and_trace_command_return_trace(command, node_id, speculate_mode=Fa logging.debug(f'Scheduler: Stdout file for: {node_id} is: {stdout_file}') logging.debug(f'Scheduler: Stderr file for: {node_id} is: {stderr_file}') logging.debug(f'Scheduler: Output variable file for: {node_id} is: {variable_file}') + logging.debug(f'Scheduler: Trace file for: {node_id} is: {trace_file}') process = async_run_and_trace_command_return_trace_in_sandbox(command, trace_file, node_id, stdout_file, stderr_file, variable_file, speculate_mode) return process, trace_file, stdout_file, stderr_file, variable_file @@ -47,6 +48,7 @@ def commit_workspace(workspace_path): ## Read trace and capture each command def read_trace(sandbox_dir, trace_file): + logging.debug(f'>>>>>>Reading trace from: {trace_file}') if sandbox_dir == "": path = trace_file else: diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 76e834ad..818f56c0 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -463,7 +463,6 @@ def get_all_non_committed(self) -> "list[NodeId]": ## This adds a node to the committed set and saves important information def commit_node(self, node_id: NodeId): logging.debug(f" > Commiting node {node_id}") - self.save_commit_state_of_cmd(node_id) self.committed.add(node_id) @@ -598,6 +597,20 @@ def cmd_can_be_resolved(self, node_id: int) -> bool: ## Otherwise we can return logging.debug(f' >> Able to resolve {node_id}') return True + + def __kill_all_currently_executing_and_schedule_restart(self): + nodes_to_kill = self.get_currently_executing() + for cmd_id in nodes_to_kill: + self.__kill_node(cmd_id) + self.workset.remove(cmd_id) + # Our new workset is the nodes that were killed + # Previous workset got killed + self.workset.extend(nodes_to_kill) + + def __kill_node(self, cmd_id: NodeId): + logging.debug(f'Killing and restarting node {cmd_id} because some workspaces have to be committed') + proc_to_kill, _trace_file, _stdout, _stderr, _variable_file = self.commands_currently_executing.pop(cmd_id) + proc_to_kill.kill() def resolve_commands_that_can_be_resolved_and_push_frontier(self): cmds_to_resolve = self.__pop_cmds_to_resolve_from_speculated() @@ -611,6 +624,7 @@ def resolve_commands_that_can_be_resolved_and_push_frontier(self): else: logging.debug(f" > Nodes to be committed this round: {to_commit}") logging.trace(f"Commit|"+",".join(str(node_id) for node_id in to_commit)) + self.__kill_all_currently_executing_and_schedule_restart() self.commit_cmd_workspaces(to_commit) # self.print_cmd_stderr(stderr) @@ -1152,8 +1166,8 @@ def schedule_node(self, cmd_id): ## Run a command and add it to the dictionary of executing ones def run_cmd_non_blocking(self, node_id: NodeId): ## A command should only be run if it's in the frontier, otherwise it should be spec run - logging.trace(f'Running command: {node_id} {self.get_node(node_id)}') - logging.trace(f"ExecutingAdd|{node_id}") + logging.debug(f'Running command: {node_id} {self.get_node(node_id)}') + logging.debug(f"ExecutingAdd|{node_id}") self.execute_cmd_core(node_id, speculate=False) ## Run a command and add it to the dictionary of executing ones @@ -1163,7 +1177,7 @@ def speculate_cmd_non_blocking(self, node_id: NodeId): ## are relevant for the report maker, ## add them in some library (e.g., trace_for_report) ## so that we don't accidentally delete them. - logging.trace(f"ExecutingSandboxAdd|{node_id}") + logging.debug(f"ExecutingSandboxAdd|{node_id}") self.execute_cmd_core(node_id, speculate=True) def execute_cmd_core(self, node_id: NodeId, speculate=False): @@ -1192,12 +1206,14 @@ def execute_cmd_core(self, node_id: NodeId, speculate=False): proc, trace_file, stdout, stderr, variable_file = execute_func(cmd, node_id) logging.debug(f'Read trace from: {trace_file}') self.commands_currently_executing[node_id] = (proc, trace_file, stdout, stderr, variable_file) + logging.debug(f" >>>>> Command {node_id} - {proc.pid} just started executing") def command_execution_completed(self, node_id: NodeId, riker_exit_code:int, sandbox_dir: str): logging.debug(f" --- Node {node_id}, just finished execution ---") self.sandbox_dirs[node_id] = sandbox_dir ## TODO: Store variable file somewhere so that we can return when wait _proc, trace_file, stdout, stderr, variable_file = self.commands_currently_executing.pop(node_id) + logging.debug(f" >>>>> Command {node_id} - {_proc.pid} just finished executing") logging.trace(f"ExecutingRemove|{node_id}") # Handle stopped by riker due to network access if int(riker_exit_code) == 159: @@ -1269,6 +1285,7 @@ def log_partial_program_order_info(self): logging.debug(f"=" * 80) logging.debug(f"WORKSET: {self.get_workset()}") logging.debug(f"COMMITTED: {self.get_committed_list()}") + logging.debug(f"FRONTIER: {self.frontier}") logging.debug(f"EXECUTING: {list(self.commands_currently_executing.keys())}") logging.debug(f"STOPPED: {list(self.stopped)}") logging.debug(f" of which UNSAFE: {list(self.get_unsafe())}") @@ -1314,21 +1331,6 @@ def populate_to_be_resolved_dict(self): def get_currently_executing(self) -> list: return sorted(list(self.commands_currently_executing.keys())) - - ## KK 2023-05-02 What does this function do? - def save_commit_state_of_cmd(self, cmd_id): - self.committed_order.append(cmd_id) - self.commit_state[cmd_id] = set(self.get_committed()) - set(self.to_be_resolved[cmd_id]) - - def log_committed_cmd_state(self): - logging.info("---------- Committed Order -----------") - logging.info(" " + " -> ".join(map(str, self.committed_order))) - logging.info("---------- Committed State -----------") - for cmd in sorted(self.get_committed_list()): - if len(self.commit_state[cmd]) == 0: - logging.info(f" CMD {cmd} on\t\tSTART") - else: - logging.info(f" CMD {cmd} after:\t{', '.join(map(str, self.commit_state[cmd]))}") def log_executions(self): logging.debug("---------- (Re)executions ------------") diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index 806ed223..3a64809e 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -193,7 +193,6 @@ def process_next_cmd(self): if not self.partial_program_order.is_completed(): logging.debug(" |- some nodes were skipped completed.") socket_respond(connection, success_response("All finished!")) - self.partial_program_order.log_committed_cmd_state() self.partial_program_order.log_executions() self.done = True else: From acf0d70ca0a0b313793315b9fe9eb8ac47de4097 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Mon, 14 Aug 2023 03:59:28 -0600 Subject: [PATCH 18/34] Kill child processes too --- parallel-orch/partial_program_order.py | 53 ++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 818f56c0..7a9c8c9e 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -6,6 +6,8 @@ import analysis import executor import trace +import time +import subprocess from shasta.ast_node import AstNode, CommandNode @@ -268,6 +270,7 @@ def __init__(self, nodes, edges): self.commit_state = {} ## Counts the times a node was (re)executed self.executions = {node_id: 0 for node_id in self.nodes.keys()} + self.banned_files = set() def __str__(self): return f"NODES: {len(self.nodes.keys())} | ADJACENCY: {self.adjacency}" @@ -606,11 +609,56 @@ def __kill_all_currently_executing_and_schedule_restart(self): # Our new workset is the nodes that were killed # Previous workset got killed self.workset.extend(nodes_to_kill) - + + + def is_process_alive(self, pid): + """Check if the process with the given PID is alive.""" + try: + os.kill(pid, 0) + except OSError: + return False + else: + return True + + def get_child_processes(self, parent_pid): + try: + output = subprocess.check_output(['pgrep', '-P', str(parent_pid)]) + return [int(pid) for pid in output.decode('utf-8').split()] + except subprocess.CalledProcessError: + # No child processes were found + return [] + def __kill_node(self, cmd_id: NodeId): logging.debug(f'Killing and restarting node {cmd_id} because some workspaces have to be committed') proc_to_kill, _trace_file, _stdout, _stderr, _variable_file = self.commands_currently_executing.pop(cmd_id) + self.banned_files.add(trace_file) + + # Get all child processes of proc_to_kill + children = self.get_child_processes(proc_to_kill.pid) + + # Kill all child processes + for child in children: + try: + # Send SIGTERM signal; you can also use 'SIGKILL' for a forceful kill + subprocess.check_call(['kill', '-TERM', str(child)]) + except subprocess.CalledProcessError: + logging.debug(f"Failed to kill PID {child}.") + + # Poll proc_to_kill and its children to ensure they're terminated + while any(self.is_process_alive(child) for child in children): + logging.debug(f"Child proc {child} still alive. Waiting...") + time.sleep(0.01) # Sleep for 10 milliseconds before checking again + + # Kill the main process proc_to_kill.kill() + time.sleep(0.01) + # If main process is alive, keep sending SIGKILL + while self.is_process_alive(proc_to_kill.pid): + logging.debug(f"Parent proc {proc_to_kill.pid} still alive. Attempting to kill again...") + proc_to_kill.kill() + time.sleep(0.001) # Sleep for 1 millisecond before checking again + logging.debug(self.is_process_alive(proc_to_kill.pid)) + def resolve_commands_that_can_be_resolved_and_push_frontier(self): cmds_to_resolve = self.__pop_cmds_to_resolve_from_speculated() @@ -1204,7 +1252,6 @@ def execute_cmd_core(self, node_id: NodeId, speculate=False): else: execute_func = executor.async_run_and_trace_command_return_trace proc, trace_file, stdout, stderr, variable_file = execute_func(cmd, node_id) - logging.debug(f'Read trace from: {trace_file}') self.commands_currently_executing[node_id] = (proc, trace_file, stdout, stderr, variable_file) logging.debug(f" >>>>> Command {node_id} - {proc.pid} just started executing") @@ -1267,7 +1314,7 @@ def print_cmd_stderr(self, stderr): print(stderr.read().decode(), file=sys.stderr, end="") def commit_cmd_workspaces(self, to_commit_ids): - for cmd_id in to_commit_ids: + for cmd_id in sorted(to_commit_ids): workspace = self.sandbox_dirs[cmd_id] if workspace != "": logging.debug(f" (!) Committing workspace of cmd {cmd_id} found in {workspace}") From 24777a94cf040f546c97bbe1232dc6c21241f14b Mon Sep 17 00:00:00 2001 From: gliargovas Date: Mon, 14 Aug 2023 04:00:50 -0600 Subject: [PATCH 19/34] Ignore trace messages previously killed --- parallel-orch/run_command.sh | 4 ++-- parallel-orch/scheduler_server.py | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/parallel-orch/run_command.sh b/parallel-orch/run_command.sh index 5fbe093e..9fdc2c4a 100755 --- a/parallel-orch/run_command.sh +++ b/parallel-orch/run_command.sh @@ -30,10 +30,10 @@ exit_code=$? ## Only used for debugging # ls -R "${SANDBOX_DIR}/upperdir" 1>&2 - +out=`head -3 $SANDBOX_DIR/upperdir/$TRACE_FILE` ## Send a message to the scheduler socket ## Assumes "${PASH_SPEC_SCHEDULER_SOCKET}" is set and exported ## Pass the proper exit code -msg="CommandExecComplete:${CMD_ID}|Exit code:${exit_code}|Sandbox dir:${SANDBOX_DIR}" +msg="CommandExecComplete:${CMD_ID}|Exit code:${exit_code}|Sandbox dir:${SANDBOX_DIR}|Trace file:${TRACE_FILE}|$$" daemon_response=$(pash_spec_communicate_scheduler_just_send "$msg") # Blocking step, daemon will not send response until it's safe to continue diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index 3a64809e..fbccce7c 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -119,7 +119,8 @@ def __parse_command_exec_complete(self, input_cmd: str) -> "tuple[int, int]": command_id = parse_node_id(components[0].split(":")[1]) exit_code = int(components[1].split(":")[1]) sandbox_dir = components[2].split(":")[1] - return command_id, exit_code, sandbox_dir + trace_file = components[3].split(":")[1] + return command_id, exit_code, sandbox_dir, trace_file except: raise Exception(f'Parsing failure for line: {input_cmd}') @@ -157,7 +158,10 @@ def respond_to_frontend_core(self, node_id: NodeId, response: str): def handle_command_exec_complete(self, input_cmd: str): assert(input_cmd.startswith("CommandExecComplete:")) ## Read the node id from the command argument - cmd_id, exit_code, sandbox_dir = self.__parse_command_exec_complete(input_cmd) + cmd_id, exit_code, sandbox_dir, trace_file = self.__parse_command_exec_complete(input_cmd) + if trace_file in self.partial_program_order.banned_files: + logging.debug(f'CommandExecComplete: {cmd_id} ignored') + return logging.debug(input_cmd) ## Gather RWset, resolve dependencies, and progress graph From 71dedcfa2d95bce57985ede7edd53bf98b82bcbd Mon Sep 17 00:00:00 2001 From: gliargovas Date: Mon, 14 Aug 2023 04:01:26 -0600 Subject: [PATCH 20/34] Ignore repetition checks --- test/test_orch.sh | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/test/test_orch.sh b/test/test_orch.sh index 305bae30..65f3d79c 100755 --- a/test/test_orch.sh +++ b/test/test_orch.sh @@ -336,25 +336,25 @@ test_network_access_3() # We run all tests composed with && to exit on the first that fails if [ "$#" -eq 0 ]; then - run_test test1_1 "1 2 3 1" # 7 - run_test test1_2 "1 2 2 1" # 6 + run_test test1_1 # "1 2 3 1" # 7 + run_test test1_2 #"1 2 2 1" # 6 run_test test1_3 #"1 2 2 1" # 6 - run_test test2_1 "1 1 1 1 1 1 1 1 1 1 1" # 10 - run_test test2_2 "1 1 1 1 1 1 1 1 1 1 1" # 10 - run_test test2_3 "1 1 1 1 1 1 1 1 1 1 1" # 10 + run_test test2_1 #"1 1 1 1 1 1 1 1 1 1 1" # 10 + run_test test2_2 #"1 1 1 1 1 1 1 1 1 1 1" # 10 + run_test test2_3 #"1 1 1 1 1 1 1 1 1 1 1" # 10 run_test test3_1 # "1 1 2 1 2" # 7 run_test test3_2 # "1 1 2 1 2" # 7 run_test test3_3 # "1 1 2 1 3" # 8 - run_test test4_1 "1 2 1" # 4 - run_test test4_2 "1 2 1" # 4 - run_test test4_3 "1 2 1" # 4 - run_test test5_1 "1 1 1" # 3 - run_test test5_2 "1 1 1" # 3 - run_test test5_3 "1 1 1" # 3 + run_test test4_1 #"1 2 1" # 4 + run_test test4_2 #"1 2 1" # 4 + run_test test4_3 #"1 2 1" # 4 + run_test test5_1 #"1 1 1" # 3 + run_test test5_2 #"1 1 1" # 3 + run_test test5_3 #"1 1 1" # 3 # run_test test6 - run_test test7_1 "1 1 1 1 1 1 1 1 1 1 1 1" # 12 - run_test test7_2 "1 1 1 1 1 1 1 1 1 1 1 1" # 12 - run_test test7_3 "1 1 1 1 1 1 1 1 1 1 1 1" # 12 + run_test test7_1 #"1 1 1 1 1 1 1 1 1 1 1 1" # 12 + run_test test7_2 #"1 1 1 1 1 1 1 1 1 1 1 1" # 12 + run_test test7_3 #"1 1 1 1 1 1 1 1 1 1 1 1" # 12 # Test 8 is failing for now # cleanup # run_test test8 @@ -362,12 +362,12 @@ if [ "$#" -eq 0 ]; then run_test test9_1 # "1 2 1 1 1 2 2 2 2 2 1 1 1" # 19 run_test test9_2 # "1 1 1 1 1 1 1 1 1 1 1 1 1" # 13 run_test test9_3 # "1 1 1 1 1 1 1 1 2 2 1 1 1" # 15 - run_test test_stdout "1 1 1 1 1 1" # 6 + run_test test_stdout #"1 1 1 1 1 1" # 6 run_test test_loop run_test test_break - run_test test_network_access_1 "1 2 2" - run_test test_network_access_2 "1 2 2 2" - run_test test_network_access_3 "1 2 2 2" + run_test test_network_access_1 #"1 2 2" + run_test test_network_access_2 #"1 2 2 2" + run_test test_network_access_3 #"1 2 2 2" else for testname in $@ do From 7db13c1b1ab78c9ed3c9dd7df62290a283ea1c48 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Mon, 14 Aug 2023 04:20:11 -0600 Subject: [PATCH 21/34] Add single command test for CI debugging --- test/test_orch.sh | 8 ++++++++ test/test_scripts/test_single_command.sh | 1 + 2 files changed, 9 insertions(+) create mode 100644 test/test_scripts/test_single_command.sh diff --git a/test/test_orch.sh b/test/test_orch.sh index 65f3d79c..0351ec94 100755 --- a/test/test_orch.sh +++ b/test/test_orch.sh @@ -133,6 +133,13 @@ run_test() fi } +test_single_command() +{ + local shell=$1 + echo $'foo\nbar\nbaz\nqux\nquux\nfoo\nbar' > "$3/in1" + $shell $2/test_single_command.sh +} + test1_1() { local shell=$1 @@ -336,6 +343,7 @@ test_network_access_3() # We run all tests composed with && to exit on the first that fails if [ "$#" -eq 0 ]; then + run_test test_single_command run_test test1_1 # "1 2 3 1" # 7 run_test test1_2 #"1 2 2 1" # 6 run_test test1_3 #"1 2 2 1" # 6 diff --git a/test/test_scripts/test_single_command.sh b/test/test_scripts/test_single_command.sh new file mode 100644 index 00000000..a0bccd90 --- /dev/null +++ b/test/test_scripts/test_single_command.sh @@ -0,0 +1 @@ +"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0 "$test_output_dir/in1" "$test_output_dir/out1" \ No newline at end of file From 7653a6195a36863b5c11b52c5fa81142471e8c13 Mon Sep 17 00:00:00 2001 From: "Tianyu (Eric) Zhu" Date: Mon, 14 Aug 2023 11:35:17 -0400 Subject: [PATCH 22/34] Update install_deps_ubuntu20.sh --- scripts/install_deps_ubuntu20.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/install_deps_ubuntu20.sh b/scripts/install_deps_ubuntu20.sh index f13e6884..fd1dc5b5 100755 --- a/scripts/install_deps_ubuntu20.sh +++ b/scripts/install_deps_ubuntu20.sh @@ -1,13 +1,13 @@ #!/bin/bash -export PASH_SPEC_TOP=${PASH_SPEC_TOP:-$(git rev-parse --show-toplevel --show-superproject-working-tree)} -export PASH_TOP=${PASH_TOP:-$PASH_SPEC_TOP/deps/pash} - ## Install Riker's dependencies sudo apt-get update -sudo apt install -y make clang llvm git gcc python3-cram file graphviz +sudo apt install -y make clang llvm git gcc python3-cram file graphviz libtool sudo update-alternatives --install /usr/bin/cram cram /usr/bin/cram3 100 +export PASH_SPEC_TOP=${PASH_SPEC_TOP:-$(git rev-parse --show-toplevel --show-superproject-working-tree)} +export PASH_TOP=${PASH_TOP:-$PASH_SPEC_TOP/deps/pash} + ## Download submodule dependencies git submodule update --init --recursive From c00b10ff3a13f4d36afd39763bcfe1c5e467c9e3 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Mon, 14 Aug 2023 12:20:39 -0600 Subject: [PATCH 23/34] Use a sandbox for Riker db files instead of main workspace --- parallel-orch/executor.py | 8 +++---- parallel-orch/partial_program_order.py | 10 ++++----- parallel-orch/run_command.sh | 23 ++++++++++++++++----- parallel-orch/template_script_to_execute.sh | 16 ++++++++------ 4 files changed, 36 insertions(+), 21 deletions(-) diff --git a/parallel-orch/executor.py b/parallel-orch/executor.py index 4d5335d2..df294ce2 100644 --- a/parallel-orch/executor.py +++ b/parallel-orch/executor.py @@ -16,7 +16,7 @@ def async_run_and_trace_command_return_trace(command, node_id, speculate_mode=Fa logging.debug(f'Scheduler: Stdout file for: {node_id} is: {stdout_file}') logging.debug(f'Scheduler: Stderr file for: {node_id} is: {stderr_file}') logging.debug(f'Scheduler: Output variable file for: {node_id} is: {variable_file}') - logging.debug(f'Scheduler: Trace file for: {node_id} is: {trace_file}') + logging.debug(f'Scheduler: Trace file for: {node_id}: {trace_file}') process = async_run_and_trace_command_return_trace_in_sandbox(command, trace_file, node_id, stdout_file, stderr_file, variable_file, speculate_mode) return process, trace_file, stdout_file, stderr_file, variable_file @@ -34,7 +34,9 @@ def async_run_and_trace_command_return_trace_in_sandbox(command, trace_file, nod args.append("standard") args.append(str(node_id)) # Save output to temporary files to not saturate the memory + logging.debug(args) process = subprocess.Popen(args, stdout=None, stderr=None) + # For debugging # process = subprocess.Popen(args) return process @@ -48,12 +50,10 @@ def commit_workspace(workspace_path): ## Read trace and capture each command def read_trace(sandbox_dir, trace_file): - logging.debug(f'>>>>>>Reading trace from: {trace_file}') if sandbox_dir == "": path = trace_file else: - path = f"{sandbox_dir}upperdir/{trace_file}" - + path = f"{sandbox_dir}/upperdir/{trace_file}" logging.debug(f'Reading trace from: {path}') with open(path) as f: return f.readlines() diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 7a9c8c9e..4f5a0133 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -630,7 +630,7 @@ def get_child_processes(self, parent_pid): def __kill_node(self, cmd_id: NodeId): logging.debug(f'Killing and restarting node {cmd_id} because some workspaces have to be committed') - proc_to_kill, _trace_file, _stdout, _stderr, _variable_file = self.commands_currently_executing.pop(cmd_id) + proc_to_kill, trace_file, _stdout, _stderr, _variable_file = self.commands_currently_executing.pop(cmd_id) self.banned_files.add(trace_file) # Get all child processes of proc_to_kill @@ -642,15 +642,13 @@ def __kill_node(self, cmd_id: NodeId): # Send SIGTERM signal; you can also use 'SIGKILL' for a forceful kill subprocess.check_call(['kill', '-TERM', str(child)]) except subprocess.CalledProcessError: - logging.debug(f"Failed to kill PID {child}.") - - # Poll proc_to_kill and its children to ensure they're terminated + logging.debug(f"Failed to kill PID {child}.") # Poll proc_to_kill and its children to ensure they're terminated while any(self.is_process_alive(child) for child in children): logging.debug(f"Child proc {child} still alive. Waiting...") time.sleep(0.01) # Sleep for 10 milliseconds before checking again - # Kill the main process - proc_to_kill.kill() + # Terminate the main process + proc_to_kill.terminate() time.sleep(0.01) # If main process is alive, keep sending SIGKILL while self.is_process_alive(proc_to_kill.pid): diff --git a/parallel-orch/run_command.sh b/parallel-orch/run_command.sh index 9fdc2c4a..9dabeeeb 100755 --- a/parallel-orch/run_command.sh +++ b/parallel-orch/run_command.sh @@ -21,10 +21,23 @@ else exit 1 fi -## Generate a temporary directory to store the workfiles -mkdir -p /tmp/pash_spec -export SANDBOX_DIR="$(mktemp -d /tmp/pash_spec/sandbox_XXXXXXX)/" -## We need to execute `try` with bash to keep the exported functions +# ## Generate a temporary directory to store the workfiles +# mkdir -p /tmp/pash_spec +# export SANDBOX_DIR="$(mktemp -d /tmp/pash_spec/a/sandbox_XXXXXXX)/" +# ## We need to execute `try` with bash to keep the exported functions + +# export TEMPDIR="$(mktemp -d /tmp3/pash_spec/b/sandbox_XXXXXXX)/" +# echo tempdir $TEMPDIR 1>&2 +# echo sandbox $SANDBOX_DIR 1>&2 + + +mkdir -p /tmp/pash_spec/a +mkdir -p /tmp/pash_spec/b +export SANDBOX_DIR="$(mktemp -d /tmp/pash_spec/a/sandbox_XXXXXXX)/" +export TEMPDIR="$(mktemp -d /tmp/pash_spec/b/sandbox_XXXXXXX)/" +# echo tempdir $TEMPDIR +# echo sandbox $SANDBOX_DIR + bash "${PASH_SPEC_TOP}/deps/try/try" -D "${SANDBOX_DIR}" "${PASH_SPEC_TOP}/parallel-orch/template_script_to_execute.sh" > "${STDOUT_FILE}" exit_code=$? @@ -35,5 +48,5 @@ out=`head -3 $SANDBOX_DIR/upperdir/$TRACE_FILE` ## Assumes "${PASH_SPEC_SCHEDULER_SOCKET}" is set and exported ## Pass the proper exit code -msg="CommandExecComplete:${CMD_ID}|Exit code:${exit_code}|Sandbox dir:${SANDBOX_DIR}|Trace file:${TRACE_FILE}|$$" +msg="CommandExecComplete:${CMD_ID}|Exit code:${exit_code}|Sandbox dir:${SANDBOX_DIR}|Trace file:${TRACE_FILE}|Tempdir:${TEMPDIR}" daemon_response=$(pash_spec_communicate_scheduler_just_send "$msg") # Blocking step, daemon will not send response until it's safe to continue diff --git a/parallel-orch/template_script_to_execute.sh b/parallel-orch/template_script_to_execute.sh index 91fc4dee..057fefa0 100755 --- a/parallel-orch/template_script_to_execute.sh +++ b/parallel-orch/template_script_to_execute.sh @@ -6,10 +6,10 @@ ## KK 2023-05-04 should this be done somewhere else? Could this interfere with overlay fs? ## TODO: Can we just ask riker to use a different cache (or put the cache to /dev/null) ## since we never really want it to take the cache into account -rm -rf ./.rkr +# rm -rf ./.rkr ## Save the script to execute in the sandboxdir -echo $CMD_STRING > ./Rikerfile +echo $CMD_STRING > "$TEMPDIR/Rikerfile" # cat ./Rikerfile 1>&2 # only for debugging ## Save the output shell variables to a file (to pass to the outside context) @@ -21,13 +21,17 @@ echo $CMD_STRING > ./Rikerfile echo 'declare -p > "$OUTPUT_VARIABLE_FILE"' >> ./Rikerfile if [ $speculate_flag -eq 1 ]; then - rkr + rkr_cmd="rkr" else - rkr --frontier + rkr_cmd="rkr --frontier" fi + +strace -o out $rkr_cmd --db "$TEMPDIR" --rikerfile "$TEMPDIR/Rikerfile" +echo 'first riker run done' 1>&2 + exit_code="$?" -rkr --debug trace -o "$TRACE_FILE" > /dev/null -pash_redir_output echo "Sandbox ${CMD_ID} Output variables saved in: $OUTPUT_VARIABLE_FILE" +rkr --db "$TEMPDIR" --rikerfile "$TEMPDIR/Rikerfile" --debug trace -o "$TRACE_FILE" > /dev/null +echo 'second riker run done' 1>&2 (exit $exit_code) From 58f01f5dfe61833fbe911ebdb4771949f0cefcc2 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Tue, 15 Aug 2023 01:56:14 -0600 Subject: [PATCH 24/34] Fix issue that caused incorrect ec return --- parallel-orch/template_script_to_execute.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parallel-orch/template_script_to_execute.sh b/parallel-orch/template_script_to_execute.sh index 057fefa0..38fae8da 100755 --- a/parallel-orch/template_script_to_execute.sh +++ b/parallel-orch/template_script_to_execute.sh @@ -27,9 +27,11 @@ else fi strace -o out $rkr_cmd --db "$TEMPDIR" --rikerfile "$TEMPDIR/Rikerfile" +exit_code="$?" + echo 'first riker run done' 1>&2 -exit_code="$?" + rkr --db "$TEMPDIR" --rikerfile "$TEMPDIR/Rikerfile" --debug trace -o "$TRACE_FILE" > /dev/null echo 'second riker run done' 1>&2 From 68ff11b0e2c0b72c4c6304f75445fca3c01e8019 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Tue, 15 Aug 2023 02:05:53 -0600 Subject: [PATCH 25/34] Run tests in home dir of CI --- .github/workflows/tests.yaml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index e18276b5..e7dfe210 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -19,7 +19,6 @@ on: - test/** - scripts/** workflow_dispatch: - # Jobs section jobs: PaSh-Spec-Tests: @@ -29,23 +28,24 @@ jobs: os: - ubuntu-20.04 runs-on: ${{ matrix.os }} - if: github.event.pull_request.draft == false + # if: github.event.pull_request.draft == false steps: - uses: actions/checkout@v2 - with: - ref: ${{ github.event.pull_request.head.sha }} - name: Running Correctness Tests run: | + cd .. + cp -r dynamic-parallelizer ~ + cd ~/dynamic-parallelizer sudo touch /.githubenv # install the system deps and pash the environment sudo -E bash scripts/install_deps_ubuntu20.sh - export PASH_TOP="$PWD/deps/pash" - export PATH=$PATH:$PASH_TOP - file ./deps/riker/debug/bin/rkr uname -a # run all the tests export DEBUG=100 - bash ./test/test_orch.sh + export PASH_TOP="$PWD/deps/pash" + export PATH=$PATH:$PASH_TOP + file ~/dynamic-parallelizer/deps/riker/debug/bin/rkr + bash ~/dynamic-parallelizer/test/test_orch.sh # get the timer timer=$(LANG=en_us_88591; date) echo "VERSION<> $GITHUB_ENV @@ -74,4 +74,4 @@ jobs: - name: Exit Code run: | # check if everything executed without errors - cd test && bash exit_code.sh + cd ~/dynamic-parallelizer/test && bash exit_code.sh From 0ca2ae1d507317e7e13fa7b87b2e2d2705039d19 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Tue, 15 Aug 2023 03:03:48 -0600 Subject: [PATCH 26/34] Add proc kill attempt timeout --- parallel-orch/partial_program_order.py | 48 ++++++++++++++++---------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 4f5a0133..7074996b 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -11,6 +11,9 @@ from shasta.ast_node import AstNode, CommandNode +MAX_KILL_ATTEMPTS = 10 # Define a maximum number of kill attempts for each process + + class CompletedNodeInfo: def __init__(self, exit_code, variable_file, stdout_file): self.exit_code = exit_code @@ -609,7 +612,6 @@ def __kill_all_currently_executing_and_schedule_restart(self): # Our new workset is the nodes that were killed # Previous workset got killed self.workset.extend(nodes_to_kill) - def is_process_alive(self, pid): """Check if the process with the given PID is alive.""" @@ -619,7 +621,7 @@ def is_process_alive(self, pid): return False else: return True - + def get_child_processes(self, parent_pid): try: output = subprocess.check_output(['pgrep', '-P', str(parent_pid)]) @@ -628,9 +630,10 @@ def get_child_processes(self, parent_pid): # No child processes were found return [] - def __kill_node(self, cmd_id: NodeId): + def __kill_node(self, cmd_id: "NodeId"): logging.debug(f'Killing and restarting node {cmd_id} because some workspaces have to be committed') proc_to_kill, trace_file, _stdout, _stderr, _variable_file = self.commands_currently_executing.pop(cmd_id) + # Add the trace file to the banned file list so we know to ignore the CommandExecComplete response self.banned_files.add(trace_file) # Get all child processes of proc_to_kill @@ -638,25 +641,32 @@ def __kill_node(self, cmd_id: NodeId): # Kill all child processes for child in children: - try: - # Send SIGTERM signal; you can also use 'SIGKILL' for a forceful kill - subprocess.check_call(['kill', '-TERM', str(child)]) - except subprocess.CalledProcessError: - logging.debug(f"Failed to kill PID {child}.") # Poll proc_to_kill and its children to ensure they're terminated - while any(self.is_process_alive(child) for child in children): - logging.debug(f"Child proc {child} still alive. Waiting...") - time.sleep(0.01) # Sleep for 10 milliseconds before checking again + kill_attempts = 0 + while self.is_process_alive(child) and kill_attempts < MAX_KILL_ATTEMPTS: + try: + # Send SIGKILL signal for a forceful kill + subprocess.check_call(['kill', '-9', str(child)]) + time.sleep(0.01) # Sleep for 10 milliseconds before checking again + except subprocess.CalledProcessError: + logging.debug(f"Failed to kill PID {child}.") + kill_attempts += 1 + + if kill_attempts >= MAX_KILL_ATTEMPTS: + logging.warning(f"Gave up killing child PID {child} after {MAX_KILL_ATTEMPTS} attempts.") # Terminate the main process - proc_to_kill.terminate() - time.sleep(0.01) - # If main process is alive, keep sending SIGKILL - while self.is_process_alive(proc_to_kill.pid): - logging.debug(f"Parent proc {proc_to_kill.pid} still alive. Attempting to kill again...") - proc_to_kill.kill() - time.sleep(0.001) # Sleep for 1 millisecond before checking again - logging.debug(self.is_process_alive(proc_to_kill.pid)) + kill_attempts = 0 + while self.is_process_alive(proc_to_kill.pid) and kill_attempts < MAX_KILL_ATTEMPTS: + try: + # Send SIGKILL signal for a forceful kill + subprocess.check_call(['kill', '-9', str(proc_to_kill.pid)]) + time.sleep(0.01) # Sleep for 10 milliseconds before checking again + except subprocess.CalledProcessError: + logging.debug(f"Failed to kill parent PID {proc_to_kill.pid}.") + kill_attempts += 1 + if kill_attempts >= MAX_KILL_ATTEMPTS: + logging.warning(f"Gave up killing parent PID {proc_to_kill.pid} after {MAX_KILL_ATTEMPTS} attempts.") def resolve_commands_that_can_be_resolved_and_push_frontier(self): cmds_to_resolve = self.__pop_cmds_to_resolve_from_speculated() From 0ff2cb0ccd6f6a14a21aa2ec35f7aabbfc7c71e2 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Tue, 15 Aug 2023 06:49:20 -0600 Subject: [PATCH 27/34] Update test workflow --- .github/workflows/tests.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index e7dfe210..193e1af2 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -28,7 +28,7 @@ jobs: os: - ubuntu-20.04 runs-on: ${{ matrix.os }} - # if: github.event.pull_request.draft == false + if: github.event.pull_request.draft == false steps: - uses: actions/checkout@v2 - name: Running Correctness Tests From 3a96e207d0184fb54440755ec318e2bb3ab4d631 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Tue, 15 Aug 2023 07:12:59 -0600 Subject: [PATCH 28/34] Move process handling to util --- parallel-orch/config.py | 2 + parallel-orch/partial_program_order.py | 52 +++----------------------- parallel-orch/util.py | 39 ++++++++++++++++++- 3 files changed, 45 insertions(+), 48 deletions(-) diff --git a/parallel-orch/config.py b/parallel-orch/config.py index 45c2c9ea..0f66570e 100644 --- a/parallel-orch/config.py +++ b/parallel-orch/config.py @@ -34,3 +34,5 @@ def log_root(msg, *args, **kwargs): SOCKET_BUF_SIZE = 8192 SCHEDULER_SOCKET = os.getenv("PASH_SPEC_SCHEDULER_SOCKET") + +MAX_KILL_ATTEMPTS = 10 # Define a maximum number of kill attempts for each process in the partial program order \ No newline at end of file diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 7074996b..c1caa65d 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -6,13 +6,10 @@ import analysis import executor import trace -import time -import subprocess +import util from shasta.ast_node import AstNode, CommandNode -MAX_KILL_ATTEMPTS = 10 # Define a maximum number of kill attempts for each process - class CompletedNodeInfo: def __init__(self, exit_code, variable_file, stdout_file): @@ -613,23 +610,6 @@ def __kill_all_currently_executing_and_schedule_restart(self): # Previous workset got killed self.workset.extend(nodes_to_kill) - def is_process_alive(self, pid): - """Check if the process with the given PID is alive.""" - try: - os.kill(pid, 0) - except OSError: - return False - else: - return True - - def get_child_processes(self, parent_pid): - try: - output = subprocess.check_output(['pgrep', '-P', str(parent_pid)]) - return [int(pid) for pid in output.decode('utf-8').split()] - except subprocess.CalledProcessError: - # No child processes were found - return [] - def __kill_node(self, cmd_id: "NodeId"): logging.debug(f'Killing and restarting node {cmd_id} because some workspaces have to be committed') proc_to_kill, trace_file, _stdout, _stderr, _variable_file = self.commands_currently_executing.pop(cmd_id) @@ -637,36 +617,14 @@ def __kill_node(self, cmd_id: "NodeId"): self.banned_files.add(trace_file) # Get all child processes of proc_to_kill - children = self.get_child_processes(proc_to_kill.pid) + children = util.get_child_processes(proc_to_kill.pid) # Kill all child processes for child in children: - kill_attempts = 0 - while self.is_process_alive(child) and kill_attempts < MAX_KILL_ATTEMPTS: - try: - # Send SIGKILL signal for a forceful kill - subprocess.check_call(['kill', '-9', str(child)]) - time.sleep(0.01) # Sleep for 10 milliseconds before checking again - except subprocess.CalledProcessError: - logging.debug(f"Failed to kill PID {child}.") - kill_attempts += 1 - - if kill_attempts >= MAX_KILL_ATTEMPTS: - logging.warning(f"Gave up killing child PID {child} after {MAX_KILL_ATTEMPTS} attempts.") - + self.kill_process(child) + # Terminate the main process - kill_attempts = 0 - while self.is_process_alive(proc_to_kill.pid) and kill_attempts < MAX_KILL_ATTEMPTS: - try: - # Send SIGKILL signal for a forceful kill - subprocess.check_call(['kill', '-9', str(proc_to_kill.pid)]) - time.sleep(0.01) # Sleep for 10 milliseconds before checking again - except subprocess.CalledProcessError: - logging.debug(f"Failed to kill parent PID {proc_to_kill.pid}.") - kill_attempts += 1 - - if kill_attempts >= MAX_KILL_ATTEMPTS: - logging.warning(f"Gave up killing parent PID {proc_to_kill.pid} after {MAX_KILL_ATTEMPTS} attempts.") + self.kill_process(proc_to_kill.pid) def resolve_commands_that_can_be_resolved_and_push_frontier(self): cmds_to_resolve = self.__pop_cmds_to_resolve_from_speculated() diff --git a/parallel-orch/util.py b/parallel-orch/util.py index f70de822..6fb5334f 100644 --- a/parallel-orch/util.py +++ b/parallel-orch/util.py @@ -3,7 +3,7 @@ import os import socket import tempfile - +import time def ptempfile(): fd, name = tempfile.mkstemp(dir=config.PASH_SPEC_TMP_PREFIX) @@ -54,3 +54,40 @@ def socket_respond(connection: socket.socket, message: str): bytes_message = message.encode('utf-8') connection.sendall(bytes_message) connection.close() + +# Check if the process with the given PID is alive. +def is_process_alive(self, pid) -> bool: + try: + os.kill(pid, 0) + except OSError: + return False + else: + return True + +# Get all child process PIDs of a process +def get_child_processes(self, parent_pid) -> int: + try: + output = subprocess.check_output(['pgrep', '-P', str(parent_pid)]) + return [int(pid) for pid in output.decode('utf-8').split()] + except subprocess.CalledProcessError: + # No child processes were found + return [] + +# Kills the process with the provided PID. +# Returns True if the process was successfully killed, False otherwise. +def kill_process(self, pid: int) -> bool: + kill_attempts = 0 + while is_process_alive(pid) and kill_attempts < MAX_KILL_ATTEMPTS: + try: + # Send SIGKILL signal for a forceful kill + subprocess.check_call(['kill', '-9', str(pid)]) + time.sleep(0.01) # Sleep for 10 milliseconds before checking again + except subprocess.CalledProcessError: + logging.debug(f"Failed to kill PID {pid}.") + kill_attempts += 1 + + if kill_attempts >= MAX_KILL_ATTEMPTS: + logging.warning(f"Gave up killing PID {pid} after {MAX_KILL_ATTEMPTS} attempts.") + return False + + return True From 435eaab7f86ae0b0e9f1bb1fbf76347c05c53a39 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Tue, 15 Aug 2023 09:32:35 -0600 Subject: [PATCH 29/34] Fix bug on process killing --- parallel-orch/config.py | 2 +- parallel-orch/partial_program_order.py | 4 ++-- parallel-orch/util.py | 15 ++++++++------- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/parallel-orch/config.py b/parallel-orch/config.py index 0f66570e..f3169c51 100644 --- a/parallel-orch/config.py +++ b/parallel-orch/config.py @@ -35,4 +35,4 @@ def log_root(msg, *args, **kwargs): SCHEDULER_SOCKET = os.getenv("PASH_SPEC_SCHEDULER_SOCKET") -MAX_KILL_ATTEMPTS = 10 # Define a maximum number of kill attempts for each process in the partial program order \ No newline at end of file +MAX_KILL_ATTEMPTS = 10 # Define a maximum number of kill attempts for each process in the partial program order diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index c1caa65d..38140c43 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -621,10 +621,10 @@ def __kill_node(self, cmd_id: "NodeId"): # Kill all child processes for child in children: - self.kill_process(child) + util.kill_process(child) # Terminate the main process - self.kill_process(proc_to_kill.pid) + util.kill_process(proc_to_kill.pid) def resolve_commands_that_can_be_resolved_and_push_frontier(self): cmds_to_resolve = self.__pop_cmds_to_resolve_from_speculated() diff --git a/parallel-orch/util.py b/parallel-orch/util.py index 6fb5334f..96cfffca 100644 --- a/parallel-orch/util.py +++ b/parallel-orch/util.py @@ -2,6 +2,7 @@ import logging import os import socket +import subprocess import tempfile import time @@ -56,7 +57,7 @@ def socket_respond(connection: socket.socket, message: str): connection.close() # Check if the process with the given PID is alive. -def is_process_alive(self, pid) -> bool: +def is_process_alive(pid) -> bool: try: os.kill(pid, 0) except OSError: @@ -65,7 +66,7 @@ def is_process_alive(self, pid) -> bool: return True # Get all child process PIDs of a process -def get_child_processes(self, parent_pid) -> int: +def get_child_processes(parent_pid) -> int: try: output = subprocess.check_output(['pgrep', '-P', str(parent_pid)]) return [int(pid) for pid in output.decode('utf-8').split()] @@ -75,19 +76,19 @@ def get_child_processes(self, parent_pid) -> int: # Kills the process with the provided PID. # Returns True if the process was successfully killed, False otherwise. -def kill_process(self, pid: int) -> bool: +def kill_process(pid: int) -> bool: kill_attempts = 0 - while is_process_alive(pid) and kill_attempts < MAX_KILL_ATTEMPTS: + while is_process_alive(pid) and kill_attempts < config.MAX_KILL_ATTEMPTS: try: # Send SIGKILL signal for a forceful kill subprocess.check_call(['kill', '-9', str(pid)]) - time.sleep(0.01) # Sleep for 10 milliseconds before checking again + time.sleep(0.005) # Sleep for 5 milliseconds before checking again except subprocess.CalledProcessError: logging.debug(f"Failed to kill PID {pid}.") kill_attempts += 1 - if kill_attempts >= MAX_KILL_ATTEMPTS: - logging.warning(f"Gave up killing PID {pid} after {MAX_KILL_ATTEMPTS} attempts.") + if kill_attempts >= config.MAX_KILL_ATTEMPTS: + logging.warning(f"Gave up killing PID {pid} after {config.MAX_KILL_ATTEMPTS} attempts.") return False return True From a58f63c864046bc2318c0691aa899cf301de5675 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Tue, 15 Aug 2023 09:54:22 -0600 Subject: [PATCH 30/34] Adjust test timings for faster CI and cleanup test dir --- test/test_scripts/independent_commands.sh | 5 ----- test/test_scripts/semi_dependent_greps_2.sh | 8 ------- test/test_scripts/test1_1.sh | 6 +++--- test/test_scripts/test1_2.sh | 6 +++--- test/test_scripts/test1_3.sh | 4 ++-- test/test_scripts/test2_1.sh | 18 ++++++++-------- test/test_scripts/test2_2.sh | 20 ++++++++--------- test/test_scripts/test3_1.sh | 8 +++---- test/test_scripts/test3_2.sh | 6 +++--- test/test_scripts/test3_3.sh | 8 +++---- test/test_scripts/test4_1.sh | 4 ++-- test/test_scripts/test4_2.sh | 4 ++-- test/test_scripts/test4_3.sh | 4 ++-- test/test_scripts/test8.sh | 7 ------ test/test_scripts/test9_1.sh | 24 ++++++++++----------- test/test_scripts/test9_2.sh | 24 ++++++++++----------- test/test_scripts/test_loop.sh | 4 ++-- test/test_scripts/test_network_access_2.sh | 6 +++--- test/test_scripts/test_network_access_3.sh | 4 ++-- 19 files changed, 75 insertions(+), 95 deletions(-) delete mode 100755 test/test_scripts/independent_commands.sh delete mode 100644 test/test_scripts/semi_dependent_greps_2.sh delete mode 100755 test/test_scripts/test8.sh diff --git a/test/test_scripts/independent_commands.sh b/test/test_scripts/independent_commands.sh deleted file mode 100755 index 4fb2a283..00000000 --- a/test/test_scripts/independent_commands.sh +++ /dev/null @@ -1,5 +0,0 @@ -grep foo "$test_output_dir/in1" > "$test_output_dir/out1" -grep bar "$test_output_dir/in2" > "$test_output_dir/out2" -grep baz "$test_output_dir/in3" > "$test_output_dir/out3" -grep qux "$test_output_dir/in4" > "$test_output_dir/out4" -grep foo "$test_output_dir/in5" > "$test_output_dir/out5" \ No newline at end of file diff --git a/test/test_scripts/semi_dependent_greps_2.sh b/test/test_scripts/semi_dependent_greps_2.sh deleted file mode 100644 index 41a6b4a0..00000000 --- a/test/test_scripts/semi_dependent_greps_2.sh +++ /dev/null @@ -1,8 +0,0 @@ -grep foo "$test_output_dir/in1" > "$test_output_dir/out1" -grep bar "$test_output_dir/in2" > "$test_output_dir/out2" -grep baz "$test_output_dir/in3" > "$test_output_dir/out2" -grep baz "$test_output_dir/out2" > "$test_output_dir/out3" -grep bar "$test_output_dir/out3" > "$test_output_dir/out4" -grep baz "$test_output_dir/out3" > "$test_output_dir/out5" -grep foo "$test_output_dir/out1" > "$test_output_dir/out2" -grep foo "$test_output_dir/out2" > "$test_output_dir/out6" \ No newline at end of file diff --git a/test/test_scripts/test1_1.sh b/test/test_scripts/test1_1.sh index ae4be15e..07035f22 100755 --- a/test/test_scripts/test1_1.sh +++ b/test/test_scripts/test1_1.sh @@ -1,4 +1,4 @@ -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 2.5 "foo" "$test_output_dir/in1" "$test_output_dir/out1" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 1 "foo" "$test_output_dir/out1" "$test_output_dir/out2" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.3 "foo" "$test_output_dir/out2" "$test_output_dir/out3" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.3 "foo" "$test_output_dir/in1" "$test_output_dir/out1" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.2 "foo" "$test_output_dir/out1" "$test_output_dir/out2" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.1 "foo" "$test_output_dir/out2" "$test_output_dir/out3" pwd diff --git a/test/test_scripts/test1_2.sh b/test/test_scripts/test1_2.sh index 2a18ac90..794472bc 100755 --- a/test/test_scripts/test1_2.sh +++ b/test/test_scripts/test1_2.sh @@ -1,4 +1,4 @@ -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.3 "foo" "$test_output_dir/in1" "$test_output_dir/out1" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 1 "foo" "$test_output_dir/out1" "$test_output_dir/out2" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 2.5 "foo" "$test_output_dir/out2" "$test_output_dir/out3" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.05 "foo" "$test_output_dir/in1" "$test_output_dir/out1" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.15 "foo" "$test_output_dir/out1" "$test_output_dir/out2" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.25 "foo" "$test_output_dir/out2" "$test_output_dir/out3" pwd diff --git a/test/test_scripts/test1_3.sh b/test/test_scripts/test1_3.sh index 86cdea3d..3d86eb17 100755 --- a/test/test_scripts/test1_3.sh +++ b/test/test_scripts/test1_3.sh @@ -1,4 +1,4 @@ -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 1 "foo" "$test_output_dir/in1" "$test_output_dir/out1" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.1 "foo" "$test_output_dir/in1" "$test_output_dir/out1" "$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.3 "foo" "$test_output_dir/out1" "$test_output_dir/out2" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 2.5 "foo" "$test_output_dir/out2" "$test_output_dir/out3" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.2 "foo" "$test_output_dir/out2" "$test_output_dir/out3" pwd diff --git a/test/test_scripts/test2_1.sh b/test/test_scripts/test2_1.sh index 36d8e867..9b584e34 100644 --- a/test/test_scripts/test2_1.sh +++ b/test/test_scripts/test2_1.sh @@ -1,13 +1,13 @@ # Tests speculation of completely independent cmds "$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.00 "hello0" "$test_output_dir/out0" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.06 "hello1" "$test_output_dir/out1" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.09 "hello2" "$test_output_dir/out2" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.04 "hello1" "$test_output_dir/out1" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.08 "hello2" "$test_output_dir/out2" "$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.12 "hello3" "$test_output_dir/out3" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.15 "hello4" "$test_output_dir/out4" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.18 "hello5" "$test_output_dir/out5" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.21 "hello6" "$test_output_dir/out6" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.24 "hello7" "$test_output_dir/out7" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.27 "hello8" "$test_output_dir/out8" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.3 "hello9" "$test_output_dir/out9" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.33 "hello10" "$test_output_dir/out10" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.16 "hello4" "$test_output_dir/out4" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.20 "hello5" "$test_output_dir/out5" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.24 "hello6" "$test_output_dir/out6" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.28 "hello7" "$test_output_dir/out7" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.32 "hello8" "$test_output_dir/out8" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.36 "hello9" "$test_output_dir/out9" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.40 "hello10" "$test_output_dir/out10" diff --git a/test/test_scripts/test2_2.sh b/test/test_scripts/test2_2.sh index e98e79c9..dd0a0039 100644 --- a/test/test_scripts/test2_2.sh +++ b/test/test_scripts/test2_2.sh @@ -1,13 +1,13 @@ # Tests speculation of completely independent cmds -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.5 "hello0" "$test_output_dir/out0" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.45 "hello1" "$test_output_dir/out1" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.40 "hello2" "$test_output_dir/out2" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.35 "hello3" "$test_output_dir/out3" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.3 "hello4" "$test_output_dir/out4" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.25 "hello5" "$test_output_dir/out5" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.2 "hello6" "$test_output_dir/out6" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.15 "hello7" "$test_output_dir/out7" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.1 "hello8" "$test_output_dir/out8" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.05 "hello9" "$test_output_dir/out9" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.4 "hello0" "$test_output_dir/out0" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.36 "hello1" "$test_output_dir/out1" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.32 "hello2" "$test_output_dir/out2" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.28 "hello3" "$test_output_dir/out3" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.24 "hello4" "$test_output_dir/out4" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.20 "hello5" "$test_output_dir/out5" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.16 "hello6" "$test_output_dir/out6" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.12 "hello7" "$test_output_dir/out7" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.08 "hello8" "$test_output_dir/out8" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.04 "hello9" "$test_output_dir/out9" "$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.0 "hello10" "$test_output_dir/out10" diff --git a/test/test_scripts/test3_1.sh b/test/test_scripts/test3_1.sh index 0ba712ae..94fe24b6 100755 --- a/test/test_scripts/test3_1.sh +++ b/test/test_scripts/test3_1.sh @@ -1,5 +1,5 @@ "$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.0 "foo" "$test_output_dir/in1" "$test_output_dir/out1" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.3 "bar" "$test_output_dir/in2" "$test_output_dir/out2" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.7 "bar" "$test_output_dir/out2" "$test_output_dir/out3" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 1.3 "baz" "$test_output_dir/in3" "$test_output_dir/out4" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 1.7 "bar" "$test_output_dir/out3" "$test_output_dir/out5" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.1 "bar" "$test_output_dir/in2" "$test_output_dir/out2" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.2 "bar" "$test_output_dir/out2" "$test_output_dir/out3" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.4 "baz" "$test_output_dir/in3" "$test_output_dir/out4" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.5 "bar" "$test_output_dir/out3" "$test_output_dir/out5" diff --git a/test/test_scripts/test3_2.sh b/test/test_scripts/test3_2.sh index b64433bf..d43f03c9 100755 --- a/test/test_scripts/test3_2.sh +++ b/test/test_scripts/test3_2.sh @@ -1,5 +1,5 @@ -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.0 "foo" "$test_output_dir/in1" "$test_output_dir/out1" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.5 "bar" "$test_output_dir/in2" "$test_output_dir/out2" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.1 "foo" "$test_output_dir/in1" "$test_output_dir/out1" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.4 "bar" "$test_output_dir/in2" "$test_output_dir/out2" "$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.0 "bar" "$test_output_dir/out2" "$test_output_dir/out3" "$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.3 "baz" "$test_output_dir/in3" "$test_output_dir/out4" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 2 "bar" "$test_output_dir/out3" "$test_output_dir/out5" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.2 "bar" "$test_output_dir/out3" "$test_output_dir/out5" diff --git a/test/test_scripts/test3_3.sh b/test/test_scripts/test3_3.sh index 766159f8..be9b0604 100755 --- a/test/test_scripts/test3_3.sh +++ b/test/test_scripts/test3_3.sh @@ -1,5 +1,5 @@ -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 1.7 "foo" "$test_output_dir/in1" "$test_output_dir/out1" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 1.3 "bar" "$test_output_dir/in2" "$test_output_dir/out2" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.7 "bar" "$test_output_dir/out2" "$test_output_dir/out3" -"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.3 "baz" "$test_output_dir/in3" "$test_output_dir/out4" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.4 "foo" "$test_output_dir/in1" "$test_output_dir/out1" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.3 "bar" "$test_output_dir/in2" "$test_output_dir/out2" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.2 "bar" "$test_output_dir/out2" "$test_output_dir/out3" +"$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.1 "baz" "$test_output_dir/in3" "$test_output_dir/out4" "$MISC_SCRIPT_DIR/sleep_and_grep.sh" 0.0 "bar" "$test_output_dir/out3" "$test_output_dir/out5" diff --git a/test/test_scripts/test4_1.sh b/test/test_scripts/test4_1.sh index a1b9b1f8..8fb69e07 100755 --- a/test/test_scripts/test4_1.sh +++ b/test/test_scripts/test4_1.sh @@ -1,5 +1,5 @@ # Tests that w/w dependencies get scheduled correctly -"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0.5 "$test_output_dir/in1" "$test_output_dir/out1" +"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0.3 "$test_output_dir/in1" "$test_output_dir/out1" "$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0 "$test_output_dir/out1" "$test_output_dir/out2" -"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0.2 "$test_output_dir/in2" "$test_output_dir/out1" +"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0.15 "$test_output_dir/in2" "$test_output_dir/out1" diff --git a/test/test_scripts/test4_2.sh b/test/test_scripts/test4_2.sh index 8f82db2b..81362366 100755 --- a/test/test_scripts/test4_2.sh +++ b/test/test_scripts/test4_2.sh @@ -1,5 +1,5 @@ # Tests that w/w dependencies get scheduled correctly -"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0.5 "$test_output_dir/in1" "$test_output_dir/out1" -"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0.2 "$test_output_dir/out1" "$test_output_dir/out2" +"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0.3 "$test_output_dir/in1" "$test_output_dir/out1" +"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0.15 "$test_output_dir/out1" "$test_output_dir/out2" "$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0 "$test_output_dir/in2" "$test_output_dir/out1" diff --git a/test/test_scripts/test4_3.sh b/test/test_scripts/test4_3.sh index 3c0c5dfa..3ed78c94 100755 --- a/test/test_scripts/test4_3.sh +++ b/test/test_scripts/test4_3.sh @@ -1,5 +1,5 @@ # Tests that w/w dependencies get scheduled correctly -"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0.2 "$test_output_dir/in1" "$test_output_dir/out1" -"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0.5 "$test_output_dir/out1" "$test_output_dir/out2" +"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0.15 "$test_output_dir/in1" "$test_output_dir/out1" +"$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0.3 "$test_output_dir/out1" "$test_output_dir/out2" "$MISC_SCRIPT_DIR/sleep_and_cat.sh" 0 "$test_output_dir/in2" "$test_output_dir/out1" diff --git a/test/test_scripts/test8.sh b/test/test_scripts/test8.sh deleted file mode 100755 index 403a6b40..00000000 --- a/test/test_scripts/test8.sh +++ /dev/null @@ -1,7 +0,0 @@ -# tests successfull scheduling of Riker stopped cmds - -echo "hello1" > "$test_output_dir/out1" -ping localhost -c 5 -q | head -1 > "$test_output_dir/out1" -ping localhost -c 5 -q | head -1 > "$test_output_dir/out2" -ping localhost -c 3 -q | head -1 > "$test_output_dir/out3" -ping localhost -c 1 -q | head -1 > "$test_output_dir/out1" diff --git a/test/test_scripts/test9_1.sh b/test/test_scripts/test9_1.sh index 96d190cb..8968cb64 100755 --- a/test/test_scripts/test9_1.sh +++ b/test/test_scripts/test9_1.sh @@ -1,16 +1,16 @@ # Tests simple directory creation # using mkdir -p -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 4 "$test_output_dir/1/2/3/4" -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 3.4 "$test_output_dir/1/2" -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 2.5 "$test_output_dir/1/3" -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 2 "$test_output_dir/1/3/4/5" -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 1.5 "$test_output_dir/1/2/3/4/5" -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 1.3 "$test_output_dir/1/2/3/4/5" -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 1.1 "$test_output_dir/1/2/3/4/5/6" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.9 "hello1" "$test_output_dir/1/1.txt" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.7 "hello2" "$test_output_dir/1/3/2.txt" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.5 "hello3" "$test_output_dir/1/3.txt" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.3 "hello4" "$test_output_dir/1/2/3/4/4.txt" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.2 "hello5" "$test_output_dir/1/2/3/4/5/5.txt" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.6 "$test_output_dir/1/2/3/4" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.55 "$test_output_dir/1/2" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.5 "$test_output_dir/1/3" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.45 "$test_output_dir/1/3/4/5" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.4 "$test_output_dir/1/2/3/4/5" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.35 "$test_output_dir/1/2/3/4/5" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.3 "$test_output_dir/1/2/3/4/5/6" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.25 "hello1" "$test_output_dir/1/1.txt" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.2 "hello2" "$test_output_dir/1/3/2.txt" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.15 "hello3" "$test_output_dir/1/3.txt" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.1 "hello4" "$test_output_dir/1/2/3/4/4.txt" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.05 "hello5" "$test_output_dir/1/2/3/4/5/5.txt" "$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0 "hello6" "$test_output_dir/1/2/3/4/5/6/6.txt" diff --git a/test/test_scripts/test9_2.sh b/test/test_scripts/test9_2.sh index 4b935992..8e71bc2c 100755 --- a/test/test_scripts/test9_2.sh +++ b/test/test_scripts/test9_2.sh @@ -2,15 +2,15 @@ # using mkdir -p "$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0 "$test_output_dir/1/2/3/4" -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.1 "$test_output_dir/1/2" -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.3 "$test_output_dir/1/3" -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.5 "$test_output_dir/1/3/4/5" -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.7 "$test_output_dir/1/2/3/4/5" -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 1.1 "$test_output_dir/1/2/3/4/5" -"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 1.3 "$test_output_dir/1/2/3/4/5/6" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 1.7 "hello1" "$test_output_dir/1/1.txt" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 2.3 "hello2" "$test_output_dir/1/3/2.txt" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 2.7 "hello3" "$test_output_dir/1/3.txt" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 3 "hello4" "$test_output_dir/1/2/3/4/4.txt" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 3.6 "hello5" "$test_output_dir/1/2/3/4/5/5.txt" -"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 4 "hello6" "$test_output_dir/1/2/3/4/5/6/6.txt" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.05 "$test_output_dir/1/2" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.1 "$test_output_dir/1/3" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.15 "$test_output_dir/1/3/4/5" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.2 "$test_output_dir/1/2/3/4/5" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.25 "$test_output_dir/1/2/3/4/5" +"$MISC_SCRIPT_DIR/sleep_and_mkdir_p.sh" 0.3 "$test_output_dir/1/2/3/4/5/6" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.4 "hello1" "$test_output_dir/1/1.txt" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.45 "hello2" "$test_output_dir/1/3/2.txt" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.5 "hello3" "$test_output_dir/1/3.txt" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.55 "hello4" "$test_output_dir/1/2/3/4/4.txt" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.6 "hello5" "$test_output_dir/1/2/3/4/5/5.txt" +"$MISC_SCRIPT_DIR/sleep_and_echo.sh" 0.65 "hello6" "$test_output_dir/1/2/3/4/5/6/6.txt" diff --git a/test/test_scripts/test_loop.sh b/test/test_scripts/test_loop.sh index d8602fe5..7cbdc78a 100644 --- a/test/test_scripts/test_loop.sh +++ b/test/test_scripts/test_loop.sh @@ -3,7 +3,7 @@ for i in 1 2; do echo hi1 for j in 1 2 3; do echo hi2 - sleep 0.5 + sleep 0.2 echo hi3 done echo hi4 @@ -15,7 +15,7 @@ for i in 1 2 3; do echo hi7 for j in 1 2; do echo hi8 - sleep 0.5 + sleep 0.2 echo hi9 done echo hi10 diff --git a/test/test_scripts/test_network_access_2.sh b/test/test_scripts/test_network_access_2.sh index 534207e2..3c999f38 100644 --- a/test/test_scripts/test_network_access_2.sh +++ b/test/test_scripts/test_network_access_2.sh @@ -1,4 +1,4 @@ -"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.1 +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0 +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.3 "$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.5 -"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.9 -"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 1.3 +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.7 diff --git a/test/test_scripts/test_network_access_3.sh b/test/test_scripts/test_network_access_3.sh index 31f58e08..fdd65d53 100644 --- a/test/test_scripts/test_network_access_3.sh +++ b/test/test_scripts/test_network_access_3.sh @@ -1,4 +1,4 @@ -"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 1.3 -"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.9 +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.7 "$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.5 +"$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0.3 "$MISC_SCRIPT_DIR/sleep_and_curl.sh" 0 From 27f46c834c73851dac65dac42350ee64009bceb4 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Tue, 15 Aug 2023 11:39:14 -0600 Subject: [PATCH 31/34] Get rid of redundant frontier refs --- parallel-orch/partial_program_order.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 38140c43..c2f3764d 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -243,7 +243,7 @@ def __init__(self, nodes, edges): ## TODO: Add assertions that committed etc do not contain loop nodes self.committed = set() ## Nodes that are in the frontier can only move to committed - self.frontier = [] + self.to_check_for_commit = [] self.rw_sets = {node_id: None for node_id in self.nodes.keys()} self.workset = [] ## A dictionary from cmd_ids that are currently executing that contains their trace_files @@ -366,7 +366,6 @@ def is_closed_sub_partial_order(self, node_ids: "list[NodeId]") -> bool: def init_partial_order(self): ## Initialize the frontier with all non-loop source nodes - self.frontier = self.get_standard_source_nodes() ## Initialize the workset self.init_workset() logging.debug(f'Initialized workset') @@ -832,7 +831,7 @@ def progress_po_due_to_wait(self, node_id: NodeId): next_nodes = self.get_next(new_nodes_sink) next_standard_nodes = self.filter_standard_nodes(next_nodes) logging.trace(f"Adding its next nodes to the frontier|{','.join(str(node_id) for node_id in next_standard_nodes)}") - self.frontier.extend(next_standard_nodes) + self.to_check_for_commit.extend(next_standard_nodes) @@ -1029,7 +1028,7 @@ def unroll_loop_node(self, target_concrete_node_id: NodeId): # GL 2023-05-22: __frontier_commit_and_push() should be called here instead of step_forward() # Although without it the test cases pass - self.frontier.append(new_first_node_id) + self.to_check_for_commit.append(new_first_node_id) ## At the end of unrolling the target node must be part of the PO assert(self.is_node_id(target_concrete_node_id)) @@ -1048,21 +1047,20 @@ def maybe_unroll(self, node_id: NodeId) -> NodeId: ## This function is not safe to call on its own, since it might leave the PO in a broken state ## It should be called right after def __frontier_commit_and_push(self): - logging.debug(" > Commiting and pushing frontier") - logging.debug(f' > Frontier: {self.frontier}') + self.to_check_for_commit.extend(self.get_all_next_non_committed_nodes()) changes_in_frontier = True while changes_in_frontier: new_frontier = [] changes_in_frontier = False # Second condition below may be unecessary - for frontier_node in self.frontier: + for frontier_node in self.to_check_for_commit: ## If a node is not in the workset it means that it is actually done executing ## KK 2023-05-10 Do we need all these conditions in here? Some might be redundant? if frontier_node not in self.get_currently_executing() \ and frontier_node not in self.get_committed() \ and frontier_node not in self.stopped \ and frontier_node not in self.speculated \ - and frontier_node not in self.workset\ + and frontier_node not in self.workset \ and not self.is_loop_node(frontier_node): ## Commit the node self.commit_node(frontier_node) @@ -1081,7 +1079,7 @@ def __frontier_commit_and_push(self): logging.debug(f" > Not commiting node {frontier_node}, readding to frontier") ## Update the frontier to the new frontier - self.frontier = new_frontier + self.to_check_for_commit = new_frontier ## For a file - dir forward dependency to exist, @@ -1298,7 +1296,7 @@ def log_partial_program_order_info(self): logging.debug(f"=" * 80) logging.debug(f"WORKSET: {self.get_workset()}") logging.debug(f"COMMITTED: {self.get_committed_list()}") - logging.debug(f"FRONTIER: {self.frontier}") + logging.debug(f"FRONTIER: {self.to_check_for_commit}") logging.debug(f"EXECUTING: {list(self.commands_currently_executing.keys())}") logging.debug(f"STOPPED: {list(self.stopped)}") logging.debug(f" of which UNSAFE: {list(self.get_unsafe())}") From d165cfd4d885412f4d86d99f172cc219a4504205 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Tue, 15 Aug 2023 23:52:40 -0600 Subject: [PATCH 32/34] Revert "Get rid of redundant frontier refs" This reverts commit 27f46c834c73851dac65dac42350ee64009bceb4. --- parallel-orch/partial_program_order.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index c2f3764d..38140c43 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -243,7 +243,7 @@ def __init__(self, nodes, edges): ## TODO: Add assertions that committed etc do not contain loop nodes self.committed = set() ## Nodes that are in the frontier can only move to committed - self.to_check_for_commit = [] + self.frontier = [] self.rw_sets = {node_id: None for node_id in self.nodes.keys()} self.workset = [] ## A dictionary from cmd_ids that are currently executing that contains their trace_files @@ -366,6 +366,7 @@ def is_closed_sub_partial_order(self, node_ids: "list[NodeId]") -> bool: def init_partial_order(self): ## Initialize the frontier with all non-loop source nodes + self.frontier = self.get_standard_source_nodes() ## Initialize the workset self.init_workset() logging.debug(f'Initialized workset') @@ -831,7 +832,7 @@ def progress_po_due_to_wait(self, node_id: NodeId): next_nodes = self.get_next(new_nodes_sink) next_standard_nodes = self.filter_standard_nodes(next_nodes) logging.trace(f"Adding its next nodes to the frontier|{','.join(str(node_id) for node_id in next_standard_nodes)}") - self.to_check_for_commit.extend(next_standard_nodes) + self.frontier.extend(next_standard_nodes) @@ -1028,7 +1029,7 @@ def unroll_loop_node(self, target_concrete_node_id: NodeId): # GL 2023-05-22: __frontier_commit_and_push() should be called here instead of step_forward() # Although without it the test cases pass - self.to_check_for_commit.append(new_first_node_id) + self.frontier.append(new_first_node_id) ## At the end of unrolling the target node must be part of the PO assert(self.is_node_id(target_concrete_node_id)) @@ -1047,20 +1048,21 @@ def maybe_unroll(self, node_id: NodeId) -> NodeId: ## This function is not safe to call on its own, since it might leave the PO in a broken state ## It should be called right after def __frontier_commit_and_push(self): - self.to_check_for_commit.extend(self.get_all_next_non_committed_nodes()) + logging.debug(" > Commiting and pushing frontier") + logging.debug(f' > Frontier: {self.frontier}') changes_in_frontier = True while changes_in_frontier: new_frontier = [] changes_in_frontier = False # Second condition below may be unecessary - for frontier_node in self.to_check_for_commit: + for frontier_node in self.frontier: ## If a node is not in the workset it means that it is actually done executing ## KK 2023-05-10 Do we need all these conditions in here? Some might be redundant? if frontier_node not in self.get_currently_executing() \ and frontier_node not in self.get_committed() \ and frontier_node not in self.stopped \ and frontier_node not in self.speculated \ - and frontier_node not in self.workset \ + and frontier_node not in self.workset\ and not self.is_loop_node(frontier_node): ## Commit the node self.commit_node(frontier_node) @@ -1079,7 +1081,7 @@ def __frontier_commit_and_push(self): logging.debug(f" > Not commiting node {frontier_node}, readding to frontier") ## Update the frontier to the new frontier - self.to_check_for_commit = new_frontier + self.frontier = new_frontier ## For a file - dir forward dependency to exist, @@ -1296,7 +1298,7 @@ def log_partial_program_order_info(self): logging.debug(f"=" * 80) logging.debug(f"WORKSET: {self.get_workset()}") logging.debug(f"COMMITTED: {self.get_committed_list()}") - logging.debug(f"FRONTIER: {self.to_check_for_commit}") + logging.debug(f"FRONTIER: {self.frontier}") logging.debug(f"EXECUTING: {list(self.commands_currently_executing.keys())}") logging.debug(f"STOPPED: {list(self.stopped)}") logging.debug(f" of which UNSAFE: {list(self.get_unsafe())}") From 1dd2da2c7ce3905f84cef88619f2029530b0dc3d Mon Sep 17 00:00:00 2001 From: gliargovas Date: Wed, 16 Aug 2023 00:09:10 -0600 Subject: [PATCH 33/34] Add logs for debugging test_network_access_2 --- parallel-orch/template_script_to_execute.sh | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/parallel-orch/template_script_to_execute.sh b/parallel-orch/template_script_to_execute.sh index 38fae8da..36e44945 100755 --- a/parallel-orch/template_script_to_execute.sh +++ b/parallel-orch/template_script_to_execute.sh @@ -26,12 +26,14 @@ else rkr_cmd="rkr --frontier" fi -strace -o out $rkr_cmd --db "$TEMPDIR" --rikerfile "$TEMPDIR/Rikerfile" +$rkr_cmd --db "$TEMPDIR" --rikerfile "$TEMPDIR/Rikerfile" exit_code="$?" -echo 'first riker run done' 1>&2 - - +if [ "$exit_code" -eq 0 ]; then + echo "first riker run done (Node: ${CMD_ID})" 1>&2 +else + echo "Riker error: first Riker command failed with EC $exit_code - (Node: ${CMD_ID})" 1>&2 +fi rkr --db "$TEMPDIR" --rikerfile "$TEMPDIR/Rikerfile" --debug trace -o "$TRACE_FILE" > /dev/null echo 'second riker run done' 1>&2 From 2d8fdae7f1d71485d08511e29a0ecf31b620cd07 Mon Sep 17 00:00:00 2001 From: gliargovas Date: Mon, 21 Aug 2023 01:13:32 -0600 Subject: [PATCH 34/34] Add node for proc killing --- parallel-orch/util.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/parallel-orch/util.py b/parallel-orch/util.py index 96cfffca..b79db475 100644 --- a/parallel-orch/util.py +++ b/parallel-orch/util.py @@ -74,6 +74,8 @@ def get_child_processes(parent_pid) -> int: # No child processes were found return [] +# Note: Check this function as it does not seem the right way to kill a proc. +# SIGKILL should be sent once and for all. # Kills the process with the provided PID. # Returns True if the process was successfully killed, False otherwise. def kill_process(pid: int) -> bool: