diff --git a/parallel-orch/executor.py b/parallel-orch/executor.py index d9f7ddab..51e3de36 100644 --- a/parallel-orch/executor.py +++ b/parallel-orch/executor.py @@ -5,7 +5,6 @@ import os from dataclasses import dataclass -from node import ConcreteNodeId @dataclass class ExecCtxt: @@ -25,7 +24,7 @@ class ExecResult: @dataclass class ExecArgs: command: str - concrete_node_id: ConcreteNodeId + concrete_node_id: "ConcreteNodeId" execution_id: int pre_execution_env_file: str speculate_mode: bool @@ -46,22 +45,28 @@ def run_assignment_and_return_env_file(assignment: str, pre_execution_env_file: process = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) return post_execution_env_file -def run_trace_sandboxed(args: ExecArgs): - run_script = f'{config.PASH_SPEC_TOP}/parallel-orch/run_command.sh' - +def run_trace(args: ExecArgs): trace_file = util.ptempfile(prefix='hs_trace') stdout_file = util.ptempfile(prefix='hs_stdout') stderr_file = util.ptempfile(prefix='hs_stderr') logging.debug(f'Scheduler: Trace file for: {args.concrete_node_id}: {trace_file}') logging.debug(f'Scheduler: Stdout file for: {args.concrete_node_id} is: {stdout_file}') logging.debug(f'Scheduler: Stderr file for: {args.concrete_node_id} is: {stderr_file}') - - sandbox_dir, tmp_dir = util.create_sandbox() post_execution_env_file = util.ptempfile(prefix='hs_post_env') - lower_dirs_str = ':'.join(args.lower_sandboxes) - speculate_mode = "speculate" if args.speculate_mode else "standard" - cmd = ["/bin/bash", run_script, args.command, trace_file, stdout_file, args.pre_execution_env_file, sandbox_dir, tmp_dir, speculate_mode, str(args.concrete_node_id), post_execution_env_file, str(args.execution_id), lower_dirs_str ] + if args.speculate_mode: + run_script = f'{config.PASH_SPEC_TOP}/parallel-orch/run_command_sandboxed.sh' + sandbox_dir, tmp_dir = util.create_sandbox() + lower_dirs_str = ':'.join(args.lower_sandboxes) + speculate_mode = "speculate" + cmd = ["/bin/bash", run_script, args.command, trace_file, stdout_file, args.pre_execution_env_file, sandbox_dir, tmp_dir, speculate_mode, str(args.concrete_node_id), post_execution_env_file, str(args.execution_id), lower_dirs_str] + else: + run_script = f'{config.PASH_SPEC_TOP}/parallel-orch/run_command_unsandboxed.sh' + sandbox_dir, tmp_dir = "", "" + lower_dirs_str = "" + speculate_mode = "standard" + cmd = ["/bin/bash", run_script, args.command, trace_file, stdout_file, args.pre_execution_env_file, speculate_mode, str(args.concrete_node_id), post_execution_env_file, str(args.execution_id)] + logging.debug(cmd) process = subprocess.Popen(cmd, stdout=None, stderr=None, preexec_fn=set_pgid) # For debugging diff --git a/parallel-orch/node.py b/parallel-orch/node.py index dfe4036f..4def46d8 100644 --- a/parallel-orch/node.py +++ b/parallel-orch/node.py @@ -3,7 +3,6 @@ import os import re import executor -from executor import ExecCtxt, ExecResult, ExecArgs import trace_v2 import util import signal @@ -268,10 +267,10 @@ class ConcreteNode: # background_sandbox: Sandbox # Exists when the node is in EXE or SPEC_EXE or after those states - exec_ctxt: ExecCtxt + exec_ctxt: executor.ExecCtxt # Exists when the node is in COMMITED or SPEC_F - exec_result: ExecResult + exec_result: executor.ExecResult # Updated when the node is loop changing and the node is transitioning # into COMMITTED or SPEC_F @@ -353,14 +352,14 @@ def is_unsafe(self): def start_command(self, env_file: str, speculate=False, speculated_nodes=None): # TODO: implement speculate # TODO: built-in commands - execute_func = executor.run_trace_sandboxed + execute_func = executor.run_trace if speculated_nodes is None: lower_sandboxes = [] else: lower_sandboxes = [node.exec_ctxt.sandbox_dir for node in reversed(speculated_nodes)] # Set the execution id self.exec_id = util.generate_id() - self.exec_ctxt = execute_func(ExecArgs(command=self.cmd, concrete_node_id=self.cnid, execution_id=self.exec_id, pre_execution_env_file=env_file, speculate_mode=speculate, lower_sandboxes=lower_sandboxes)) + self.exec_ctxt = execute_func(executor.ExecArgs(command=self.cmd, concrete_node_id=self.cnid, execution_id=self.exec_id, pre_execution_env_file=env_file, speculate_mode=speculate, lower_sandboxes=lower_sandboxes)) util.debug_log(f'Node {self.cnid} executing with pid {self.exec_ctxt.process.pid}') def execution_outcome(self) -> Tuple[int, str, str]: @@ -593,7 +592,7 @@ def start_spec_executing(self, env_file, speculated_nodes): def collect_result(self): assert self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING] self.exec_ctxt.process.wait() - self.exec_result = ExecResult(self.exec_ctxt.process.returncode, self.exec_ctxt.process.pid) + self.exec_result = executor.ExecResult(self.exec_ctxt.process.returncode, self.exec_ctxt.process.pid) return self.exec_result.exit_code == 137 def commit_frontier_execution(self): @@ -607,7 +606,7 @@ def commit_frontier_execution(self): self.trace_ctx = None self.update_loop_list_context() overhead_log(f"COMMIT|{self.cnid}") - executor.commit_workspace(self.exec_ctxt.sandbox_dir) + # executor.commit_workspace(self.exec_ctxt.sandbox_dir) overhead_log(f"COMMIT_END|{self.cnid}") # util.delete_sandbox(self.exec_ctxt.sandbox_dir) self.state = NodeState.COMMITTED diff --git a/parallel-orch/run_command.sh b/parallel-orch/run_command_sandboxed.sh similarity index 100% rename from parallel-orch/run_command.sh rename to parallel-orch/run_command_sandboxed.sh diff --git a/parallel-orch/run_command_unsandboxed.sh b/parallel-orch/run_command_unsandboxed.sh new file mode 100755 index 00000000..ac506546 --- /dev/null +++ b/parallel-orch/run_command_unsandboxed.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +## TODO: Not sure if it is OK and ideal to give this a string +export CMD_STRING=${1?No command was given to execute} +export TRACE_FILE=${2?No trace file path given} +export STDOUT_FILE=${3?No stdout file given} +export LATEST_ENV_FILE=${4?No env file to run with given} +export EXEC_MODE=${5?No execution mode given} +export CMD_ID=${6?No command id given} +export POST_EXEC_ENV=${7?No Riker env file given} +export EXECUTION_ID=${8?No execution id given} + +## KK 2023-04-24: Not sure this should be run every time we run a command +## GL 2023-07-08: Tests seem to pass without it +source "$PASH_TOP/compiler/orchestrator_runtime/speculative/pash_spec_init_setup.sh" + +if [ "standard" == "$EXEC_MODE" ]; then + echo $$ > /sys/fs/cgroup/frontier/cgroup.procs +elif [ "speculate" == "$EXEC_MODE" ]; then + renice 20 -p $$ >/dev/null +fi + +# mkdir -p /tmp/pash_spec/a +# mkdir -p /tmp/pash_spec/b +# export SANDBOX_DIR="$(mktemp -d /tmp/pash_spec/a/sandbox_XXXXXXX)/" +# export TEMPDIR="$(mktemp -d /tmp/pash_spec/b/sandbox_XXXXXXX)" +# echo tempdir $TEMPDIR +# echo sandbox $SANDBOX_DIR + +bash "${PASH_SPEC_TOP}/parallel-orch/template_script_to_execute.sh" > "${STDOUT_FILE}" +exit_code=$? +## Only used for debugging +# ls -R "${SANDBOX_DIR}/upperdir" 1>&2 +# out=`head -3 $SANDBOX_DIR/upperdir/$TRACE_FILE` +## Send a message to the scheduler socket +## Assumes "${PASH_SPEC_SCHEDULER_SOCKET}" is set and exported + +## Pass the proper exit code +msg="CommandExecComplete:${CMD_ID}|Exec id:${EXECUTION_ID}|Trace file:${TRACE_FILE}|Tempdir:${TEMPDIR}" +daemon_response=$(pash_spec_communicate_scheduler_just_send "$msg") # Blocking step, daemon will not send response until it's safe to continue +(exit $exit_code)