Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions parallel-orch/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import os

from dataclasses import dataclass
from node import ConcreteNodeId

@dataclass
class ExecCtxt:
Expand All @@ -25,7 +24,7 @@ class ExecResult:
@dataclass
class ExecArgs:
command: str
concrete_node_id: ConcreteNodeId
concrete_node_id: "ConcreteNodeId"
execution_id: int
pre_execution_env_file: str
speculate_mode: bool
Expand All @@ -46,22 +45,28 @@ def run_assignment_and_return_env_file(assignment: str, pre_execution_env_file:
process = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
return post_execution_env_file

def run_trace_sandboxed(args: ExecArgs):
run_script = f'{config.PASH_SPEC_TOP}/parallel-orch/run_command.sh'

def run_trace(args: ExecArgs):
trace_file = util.ptempfile(prefix='hs_trace')
stdout_file = util.ptempfile(prefix='hs_stdout')
stderr_file = util.ptempfile(prefix='hs_stderr')
logging.debug(f'Scheduler: Trace file for: {args.concrete_node_id}: {trace_file}')
logging.debug(f'Scheduler: Stdout file for: {args.concrete_node_id} is: {stdout_file}')
logging.debug(f'Scheduler: Stderr file for: {args.concrete_node_id} is: {stderr_file}')

sandbox_dir, tmp_dir = util.create_sandbox()
post_execution_env_file = util.ptempfile(prefix='hs_post_env')
lower_dirs_str = ':'.join(args.lower_sandboxes)
speculate_mode = "speculate" if args.speculate_mode else "standard"

cmd = ["/bin/bash", run_script, args.command, trace_file, stdout_file, args.pre_execution_env_file, sandbox_dir, tmp_dir, speculate_mode, str(args.concrete_node_id), post_execution_env_file, str(args.execution_id), lower_dirs_str ]
if args.speculate_mode:
run_script = f'{config.PASH_SPEC_TOP}/parallel-orch/run_command_sandboxed.sh'
sandbox_dir, tmp_dir = util.create_sandbox()
lower_dirs_str = ':'.join(args.lower_sandboxes)
speculate_mode = "speculate"
cmd = ["/bin/bash", run_script, args.command, trace_file, stdout_file, args.pre_execution_env_file, sandbox_dir, tmp_dir, speculate_mode, str(args.concrete_node_id), post_execution_env_file, str(args.execution_id), lower_dirs_str]
else:
run_script = f'{config.PASH_SPEC_TOP}/parallel-orch/run_command_unsandboxed.sh'
sandbox_dir, tmp_dir = "", ""
lower_dirs_str = ""
speculate_mode = "standard"
cmd = ["/bin/bash", run_script, args.command, trace_file, stdout_file, args.pre_execution_env_file, speculate_mode, str(args.concrete_node_id), post_execution_env_file, str(args.execution_id)]

logging.debug(cmd)
process = subprocess.Popen(cmd, stdout=None, stderr=None, preexec_fn=set_pgid)
# For debugging
Expand Down
13 changes: 6 additions & 7 deletions parallel-orch/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import re
import executor
from executor import ExecCtxt, ExecResult, ExecArgs
import trace_v2
import util
import signal
Expand Down Expand Up @@ -268,10 +267,10 @@ class ConcreteNode:
# background_sandbox: Sandbox

# Exists when the node is in EXE or SPEC_EXE or after those states
exec_ctxt: ExecCtxt
exec_ctxt: executor.ExecCtxt

# Exists when the node is in COMMITED or SPEC_F
exec_result: ExecResult
exec_result: executor.ExecResult

# Updated when the node is loop changing and the node is transitioning
# into COMMITTED or SPEC_F
Expand Down Expand Up @@ -353,14 +352,14 @@ def is_unsafe(self):
def start_command(self, env_file: str, speculate=False, speculated_nodes=None):
# TODO: implement speculate
# TODO: built-in commands
execute_func = executor.run_trace_sandboxed
execute_func = executor.run_trace
if speculated_nodes is None:
lower_sandboxes = []
else:
lower_sandboxes = [node.exec_ctxt.sandbox_dir for node in reversed(speculated_nodes)]
# Set the execution id
self.exec_id = util.generate_id()
self.exec_ctxt = execute_func(ExecArgs(command=self.cmd, concrete_node_id=self.cnid, execution_id=self.exec_id, pre_execution_env_file=env_file, speculate_mode=speculate, lower_sandboxes=lower_sandboxes))
self.exec_ctxt = execute_func(executor.ExecArgs(command=self.cmd, concrete_node_id=self.cnid, execution_id=self.exec_id, pre_execution_env_file=env_file, speculate_mode=speculate, lower_sandboxes=lower_sandboxes))
util.debug_log(f'Node {self.cnid} executing with pid {self.exec_ctxt.process.pid}')

def execution_outcome(self) -> Tuple[int, str, str]:
Expand Down Expand Up @@ -593,7 +592,7 @@ def start_spec_executing(self, env_file, speculated_nodes):
def collect_result(self):
assert self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING]
self.exec_ctxt.process.wait()
self.exec_result = ExecResult(self.exec_ctxt.process.returncode, self.exec_ctxt.process.pid)
self.exec_result = executor.ExecResult(self.exec_ctxt.process.returncode, self.exec_ctxt.process.pid)
return self.exec_result.exit_code == 137

def commit_frontier_execution(self):
Expand All @@ -607,7 +606,7 @@ def commit_frontier_execution(self):
self.trace_ctx = None
self.update_loop_list_context()
overhead_log(f"COMMIT|{self.cnid}")
executor.commit_workspace(self.exec_ctxt.sandbox_dir)
# executor.commit_workspace(self.exec_ctxt.sandbox_dir)
overhead_log(f"COMMIT_END|{self.cnid}")
# util.delete_sandbox(self.exec_ctxt.sandbox_dir)
self.state = NodeState.COMMITTED
Expand Down
File renamed without changes.
41 changes: 41 additions & 0 deletions parallel-orch/run_command_unsandboxed.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/bin/bash

## TODO: Not sure if it is OK and ideal to give this a string
export CMD_STRING=${1?No command was given to execute}
export TRACE_FILE=${2?No trace file path given}
export STDOUT_FILE=${3?No stdout file given}
export LATEST_ENV_FILE=${4?No env file to run with given}
export EXEC_MODE=${5?No execution mode given}
export CMD_ID=${6?No command id given}
export POST_EXEC_ENV=${7?No Riker env file given}
export EXECUTION_ID=${8?No execution id given}

## KK 2023-04-24: Not sure this should be run every time we run a command
## GL 2023-07-08: Tests seem to pass without it
source "$PASH_TOP/compiler/orchestrator_runtime/speculative/pash_spec_init_setup.sh"

if [ "standard" == "$EXEC_MODE" ]; then
echo $$ > /sys/fs/cgroup/frontier/cgroup.procs
elif [ "speculate" == "$EXEC_MODE" ]; then
renice 20 -p $$ >/dev/null
fi

# mkdir -p /tmp/pash_spec/a
# mkdir -p /tmp/pash_spec/b
# export SANDBOX_DIR="$(mktemp -d /tmp/pash_spec/a/sandbox_XXXXXXX)/"
# export TEMPDIR="$(mktemp -d /tmp/pash_spec/b/sandbox_XXXXXXX)"
# echo tempdir $TEMPDIR
# echo sandbox $SANDBOX_DIR

bash "${PASH_SPEC_TOP}/parallel-orch/template_script_to_execute.sh" > "${STDOUT_FILE}"
exit_code=$?
## Only used for debugging
# ls -R "${SANDBOX_DIR}/upperdir" 1>&2
# out=`head -3 $SANDBOX_DIR/upperdir/$TRACE_FILE`
## Send a message to the scheduler socket
## Assumes "${PASH_SPEC_SCHEDULER_SOCKET}" is set and exported

## Pass the proper exit code
msg="CommandExecComplete:${CMD_ID}|Exec id:${EXECUTION_ID}|Trace file:${TRACE_FILE}|Tempdir:${TEMPDIR}"
daemon_response=$(pash_spec_communicate_scheduler_just_send "$msg") # Blocking step, daemon will not send response until it's safe to continue
(exit $exit_code)