diff --git a/deps/pash b/deps/pash index 41501148..5de0f2cf 160000 --- a/deps/pash +++ b/deps/pash @@ -1 +1 @@ -Subproject commit 41501148ab154656fd19bbcb43584f450ed33083 +Subproject commit 5de0f2cf5283648f28d06f5a5377729237ee809f diff --git a/parallel-orch/node.py b/parallel-orch/node.py index 2d3e1d58..a5d3ff24 100644 --- a/parallel-orch/node.py +++ b/parallel-orch/node.py @@ -15,14 +15,10 @@ import analysis STATE_LOG = '[STATE_LOG] ' -OVERHEAD_LOG = '[OVERHEAD_LOG] ' def state_log(s): logging.info(STATE_LOG + s) -def overhead_log(s): - logging.info(OVERHEAD_LOG + s) - class NodeState(Enum): INIT = auto() READY = auto() @@ -532,18 +528,18 @@ def parse_env(content): with open(other_env, 'r') as file: other_env_vars = parse_env(file.read()) - logging.debug(f"Comparing env files {self.exec_ctxt.pre_env_file} and {other_env}") + util.debug_log(f"Comparing env files {self.exec_ctxt.pre_env_file} and {other_env}") conflict_exists = False for key in set(node_env_vars.keys()).union(other_env_vars.keys()): if key not in node_env_vars: - logging.debug(f"Variable {key} missing in node environment") + util.debug_log(f"Variable {key} missing in node environment") conflict_exists = True elif key not in other_env_vars: - logging.debug(f"Variable {key} missing in other environment") + util.debug_log(f"Variable {key} missing in other environment") conflict_exists = True elif node_env_vars[key] != other_env_vars[key]: - logging.debug(f"Variable {key} differs: node environment has {node_env_vars[key]}, other has {other_env_vars[key]}") + util.debug_log(f"Variable {key} differs: node environment has {node_env_vars[key]}, other has {other_env_vars[key]}") conflict_exists = True with open(self.exec_ctxt.pre_env_file + '.fds', 'r') as file1, open(other_env + '.fds', 'r') as file2: @@ -559,7 +555,7 @@ def parse_env(content): return conflict_exists def kill_children(self): - overhead_log(f"KILL|{self.cnid}") + util.overhead_log(f"KILL|{self.cnid}") assert self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING] process = self.exec_ctxt.process try: @@ -567,7 +563,7 @@ def kill_children(self): except ProcessLookupError: pass process.wait() - overhead_log(f"KILL_END|{self.cnid}") + util.overhead_log(f"KILL_END|{self.cnid}") def commit_fd_writes(self): with open(self.exec_ctxt.pre_env_file + '.fds', 'r') as f: @@ -615,7 +611,7 @@ def reset_to_ready(self, spec_pre_env: str = None): assert self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING, NodeState.SPECULATED] - logging.info(f"Resetting node {self.id_} to ready {self.exec_id}") + state_log(f"Resetting node {self.id_} to ready {self.exec_id}") # We reset the exec id so if we receive a message # due to a race condition, we will ignore it. self.exec_id = None @@ -623,9 +619,9 @@ def reset_to_ready(self, spec_pre_env: str = None): # TODO: make this more sophisticated if self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING]: self.kill_children() - overhead_log(f"DELETE_SANDBOX|{self.cnid}") + util.overhead_log(f"DELETE_SANDBOX|{self.cnid}") util.delete_sandbox(self.exec_ctxt.sandbox_dir) - overhead_log(f"DELETE_SANDBOX_END|{self.cnid}") + util.overhead_log(f"DELETE_SANDBOX_END|{self.cnid}") self.exec_ctxt = None self.exec_result = None if spec_pre_env is not None: @@ -671,10 +667,10 @@ def commit_frontier_execution(self): self.trace_fd = None self.trace_ctx = None self.update_loop_list_context() - overhead_log(f"COMMIT|{self.cnid}") + util.overhead_log(f"COMMIT|{self.cnid}") executor.commit_workspace(self.exec_ctxt.sandbox_dir) # self.commit_fd_writes() - overhead_log(f"COMMIT_END|{self.cnid}") + util.overhead_log(f"COMMIT_END|{self.cnid}") # util.delete_sandbox(self.exec_ctxt.sandbox_dir) self.fixup_fds() self.state = NodeState.COMMITTED @@ -696,10 +692,10 @@ def finish_spec_execution(self): def commit_speculated(self): assert self.state == NodeState.SPECULATED - overhead_log(f"COMMIT|{self.cnid}") + util.overhead_log(f"COMMIT|{self.cnid}") executor.commit_workspace(self.exec_ctxt.sandbox_dir) # self.commit_fd_writes() - overhead_log(f"COMMIT_END|{self.cnid}") + util.overhead_log(f"COMMIT_END|{self.cnid}") # util.delete_sandbox(self.exec_ctxt.sandbox_dir) self.state = NodeState.COMMITTED self.trace_state() diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 8b4c9adc..d2e8af3c 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -8,16 +8,13 @@ PROG_LOG = '[PROG_LOG] ' EVENT_LOG = '[EVENT_LOG] ' -OVERHEAD_LOG = '[OVERHEAD_LOG] ' def event_log(s): logging.info(EVENT_LOG + s) def progress_log(s): - logging.info(PROG_LOG + s) - -def overhead_log(s): - logging.info(OVERHEAD_LOG + s) + # logging.info(PROG_LOG + s) + pass def simulate_loop_iter_env(env, var, loop_list_context, loop_iters): loop_list = loop_list_context.get_top() @@ -98,7 +95,7 @@ def get_frontier(self): return self.frontier def log_info(self): - logging.info(f"Nodes: {self.concrete_nodes}") + # logging.info(f"Nodes: {self.concrete_nodes}") # logging.info(f"Adjacency: {self.adjacency}") # logging.info(f"Inverse adjacency: {self.inverse_adjacency}") self.log_state() @@ -407,9 +404,9 @@ def valid(self): def fetch_fs_actions(self): for node in self.get_executing_normal_and_spec_nodes(): - overhead_log(f"TRACE_FETCHING|{node.cnid}") + util.overhead_log(f"TRACE_FETCHING|{node.cnid}") node.gather_fs_actions() - overhead_log(f"TRACE_FETCHING_END|{node.cnid}") + util.overhead_log(f"TRACE_FETCHING_END|{node.cnid}") def _has_fs_deps(self, concrete_node_id: ConcreteNodeId): node_of_interest : ConcreteNode = self.get_concrete_node(concrete_node_id) diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index 6707686a..4eaecc5d 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -83,14 +83,14 @@ def __init__(self, socket_file, window): def handle_init(self, input_cmd: str): assert(input_cmd.startswith("Init")) partial_order_file = input_cmd.split(":")[1].rstrip() - logging.debug(f'Scheduler: Received partial_order_file: {partial_order_file}') + util.debug_log(f'Scheduler: Received partial_order_file: {partial_order_file}') self.partial_program_order = util.parse_partial_program_order_from_file(partial_order_file) util.debug_log(str(self.partial_program_order.hsprog)) def handle_wait(self, input_cmd: str, connection): concrete_node_id, env_file = self.__parse_wait(input_cmd) self.waiting_for_response[concrete_node_id] = connection - logging.info(f'Scheduler: Received wait message - {concrete_node_id}.') + util.debug_log(f'Scheduler: Received wait message - {concrete_node_id}.') self.latest_env = env_file if self.partial_program_order.pre_handle_wait(concrete_node_id, env_file): self.partial_program_order.handle_wait(concrete_node_id, env_file) @@ -112,13 +112,13 @@ def process_next_cmd(self): connection.close() self.handle_init(input_cmd) elif (input_cmd.startswith("Daemon Start") or input_cmd == ""): - logging.info(f'Scheduler: Received daemon start message.') + util.debug_log(f'Scheduler: Received daemon start message.') connection.close() elif (input_cmd.startswith("CommandExecComplete:")): node_id, exec_id, sandbox_dir, trace_file = self.__parse_command_exec_x(input_cmd) connection.close() if self.partial_program_order.get_concrete_node(node_id).exec_id == exec_id: - logging.info(f'Scheduler: Received command exec complete message - {node_id}.') + util.debug_log(f'Scheduler: Received command exec complete message - {node_id}.') self.partial_program_order.handle_complete(node_id, node_id in self.waiting_for_response, self.latest_env) if self.partial_program_order.get_concrete_node(node_id).is_committed(): @@ -128,7 +128,7 @@ def process_next_cmd(self): self.partial_program_order.finish_wait_unsafe(node_id, self.latest_env) self.respond_to_wait_on_unsafe(node_id) else: - logging.info(f'Scheduler: Received command exec complete message for a killed instance, ignoring - {node_id}.') + util.debug_log(f'Scheduler: Received command exec complete message for a killed instance, ignoring - {node_id}.') elif (input_cmd.startswith("Wait")): self.handle_wait(input_cmd, connection) elif (input_cmd.startswith("Done")): diff --git a/parallel-orch/util.py b/parallel-orch/util.py index c76d55ca..56f2f13d 100644 --- a/parallel-orch/util.py +++ b/parallel-orch/util.py @@ -19,6 +19,10 @@ def debug_log(s): logging.debug(DEBUG_LOG + s) +def overhead_log(s): + # logging.debug(DEBUG_LOG + s) + pass + def ptempfile(prefix=''): fd, name = tempfile.mkstemp(dir=config.PASH_SPEC_TMP_PREFIX, prefix=prefix+'_') ## TODO: Get a name without opening the fd too if possible @@ -298,18 +302,18 @@ def parse_partial_program_order_from_file(file_path: str): loop_context_end = number_of_nodes + loop_context_start loop_context_lines = lines[loop_context_start:loop_context_end] loop_contexts = parse_loop_contexts(loop_context_lines) - logging.debug(f'Loop contexts: {loop_contexts}') + debug_log(f'Loop contexts: {loop_contexts}') var_assignment_lines = int(lines[loop_context_end]) var_assignment_start = loop_context_end + 1 var_assignment_end = var_assignment_start + var_assignment_lines var_assignment_lines = lines[var_assignment_start:var_assignment_end] var_assignments = parse_var_assignment_lines(var_assignment_lines) - logging.debug(f'Var assignments: {var_assignments}') + debug_log(f'Var assignments: {var_assignments}') ## The rest of the lines are edge_lines edge_lines = lines[var_assignment_end:] - logging.debug(f'Edges: {edge_lines}') + debug_log(f'Edges: {edge_lines}') ab_nodes = {} for i in range(number_of_nodes): @@ -330,8 +334,8 @@ def parse_partial_program_order_from_file(file_path: str): from_id, to_id = parse_edge_line(edge_line) edges[NodeId(from_id)].append(NodeId(to_id)) - logging.info(f"Nodes|{','.join([str(node) for node in ab_nodes])}") - logging.info(f"Edges|{edges}") + debug_log(f"Nodes|{','.join([str(node) for node in ab_nodes])}") + debug_log(f"Edges|{edges}") return PartialProgramOrder(ab_nodes, edges, hs_prog) def generate_id() -> int: diff --git a/pash-spec.sh b/pash-spec.sh index ef658a78..aecf3f13 100755 --- a/pash-spec.sh +++ b/pash-spec.sh @@ -17,14 +17,14 @@ sudo bash -c "echo $protected_mem > /sys/fs/cgroup/frontier/memory.min" if [ -n "$PASH_TMP_DIR" ]; then mkdir -p $PASH_TMP_DIR/tmp/pash_spec - echo $PASH_TMP_DIR >&2 + # echo $PASH_TMP_DIR >&2 export PASH_SPEC_TMP_PREFIX="$(mktemp -d "$PASH_TMP_DIR/tmp/pash_spec/pash_XXXXXXX")" else mkdir -p /tmp/pash_spec export PASH_SPEC_TMP_PREFIX="$(mktemp -d /tmp/pash_spec/pash_XXXXXXX)" fi -echo "PASH_SPEC_TMP_PREFIX: $PASH_SPEC_TMP_PREFIX" >&2 +# echo "PASH_SPEC_TMP_PREFIX: $PASH_SPEC_TMP_PREFIX" >&2 ## Initialize the scheduler-server export PASH_SPEC_SCHEDULER_SOCKET="${PASH_SPEC_TMP_PREFIX}/scheduler_socket"