Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/pash
30 changes: 13 additions & 17 deletions parallel-orch/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@
import analysis

STATE_LOG = '[STATE_LOG] '
OVERHEAD_LOG = '[OVERHEAD_LOG] '

def state_log(s):
logging.info(STATE_LOG + s)

def overhead_log(s):
logging.info(OVERHEAD_LOG + s)

class NodeState(Enum):
INIT = auto()
READY = auto()
Expand Down Expand Up @@ -532,18 +528,18 @@ def parse_env(content):
with open(other_env, 'r') as file:
other_env_vars = parse_env(file.read())

logging.debug(f"Comparing env files {self.exec_ctxt.pre_env_file} and {other_env}")
util.debug_log(f"Comparing env files {self.exec_ctxt.pre_env_file} and {other_env}")

conflict_exists = False
for key in set(node_env_vars.keys()).union(other_env_vars.keys()):
if key not in node_env_vars:
logging.debug(f"Variable {key} missing in node environment")
util.debug_log(f"Variable {key} missing in node environment")
conflict_exists = True
elif key not in other_env_vars:
logging.debug(f"Variable {key} missing in other environment")
util.debug_log(f"Variable {key} missing in other environment")
conflict_exists = True
elif node_env_vars[key] != other_env_vars[key]:
logging.debug(f"Variable {key} differs: node environment has {node_env_vars[key]}, other has {other_env_vars[key]}")
util.debug_log(f"Variable {key} differs: node environment has {node_env_vars[key]}, other has {other_env_vars[key]}")
conflict_exists = True

with open(self.exec_ctxt.pre_env_file + '.fds', 'r') as file1, open(other_env + '.fds', 'r') as file2:
Expand All @@ -559,15 +555,15 @@ def parse_env(content):
return conflict_exists

def kill_children(self):
overhead_log(f"KILL|{self.cnid}")
util.overhead_log(f"KILL|{self.cnid}")
assert self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING]
process = self.exec_ctxt.process
try:
os.killpg(process.pid, signal.SIGKILL)
except ProcessLookupError:
pass
process.wait()
overhead_log(f"KILL_END|{self.cnid}")
util.overhead_log(f"KILL_END|{self.cnid}")

def commit_fd_writes(self):
with open(self.exec_ctxt.pre_env_file + '.fds', 'r') as f:
Expand Down Expand Up @@ -615,17 +611,17 @@ def reset_to_ready(self, spec_pre_env: str = None):
assert self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING,
NodeState.SPECULATED]

logging.info(f"Resetting node {self.id_} to ready {self.exec_id}")
state_log(f"Resetting node {self.id_} to ready {self.exec_id}")
# We reset the exec id so if we receive a message
# due to a race condition, we will ignore it.
self.exec_id = None

# TODO: make this more sophisticated
if self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING]:
self.kill_children()
overhead_log(f"DELETE_SANDBOX|{self.cnid}")
util.overhead_log(f"DELETE_SANDBOX|{self.cnid}")
util.delete_sandbox(self.exec_ctxt.sandbox_dir)
overhead_log(f"DELETE_SANDBOX_END|{self.cnid}")
util.overhead_log(f"DELETE_SANDBOX_END|{self.cnid}")
self.exec_ctxt = None
self.exec_result = None
if spec_pre_env is not None:
Expand Down Expand Up @@ -671,10 +667,10 @@ def commit_frontier_execution(self):
self.trace_fd = None
self.trace_ctx = None
self.update_loop_list_context()
overhead_log(f"COMMIT|{self.cnid}")
util.overhead_log(f"COMMIT|{self.cnid}")
executor.commit_workspace(self.exec_ctxt.sandbox_dir)
# self.commit_fd_writes()
overhead_log(f"COMMIT_END|{self.cnid}")
util.overhead_log(f"COMMIT_END|{self.cnid}")
# util.delete_sandbox(self.exec_ctxt.sandbox_dir)
self.fixup_fds()
self.state = NodeState.COMMITTED
Expand All @@ -696,10 +692,10 @@ def finish_spec_execution(self):

def commit_speculated(self):
assert self.state == NodeState.SPECULATED
overhead_log(f"COMMIT|{self.cnid}")
util.overhead_log(f"COMMIT|{self.cnid}")
executor.commit_workspace(self.exec_ctxt.sandbox_dir)
# self.commit_fd_writes()
overhead_log(f"COMMIT_END|{self.cnid}")
util.overhead_log(f"COMMIT_END|{self.cnid}")
# util.delete_sandbox(self.exec_ctxt.sandbox_dir)
self.state = NodeState.COMMITTED
self.trace_state()
Expand Down
13 changes: 5 additions & 8 deletions parallel-orch/partial_program_order.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@

PROG_LOG = '[PROG_LOG] '
EVENT_LOG = '[EVENT_LOG] '
OVERHEAD_LOG = '[OVERHEAD_LOG] '

def event_log(s):
logging.info(EVENT_LOG + s)

def progress_log(s):
logging.info(PROG_LOG + s)

def overhead_log(s):
logging.info(OVERHEAD_LOG + s)
# logging.info(PROG_LOG + s)
pass

def simulate_loop_iter_env(env, var, loop_list_context, loop_iters):
loop_list = loop_list_context.get_top()
Expand Down Expand Up @@ -98,7 +95,7 @@ def get_frontier(self):
return self.frontier

def log_info(self):
logging.info(f"Nodes: {self.concrete_nodes}")
# logging.info(f"Nodes: {self.concrete_nodes}")
# logging.info(f"Adjacency: {self.adjacency}")
# logging.info(f"Inverse adjacency: {self.inverse_adjacency}")
self.log_state()
Expand Down Expand Up @@ -407,9 +404,9 @@ def valid(self):

def fetch_fs_actions(self):
for node in self.get_executing_normal_and_spec_nodes():
overhead_log(f"TRACE_FETCHING|{node.cnid}")
util.overhead_log(f"TRACE_FETCHING|{node.cnid}")
node.gather_fs_actions()
overhead_log(f"TRACE_FETCHING_END|{node.cnid}")
util.overhead_log(f"TRACE_FETCHING_END|{node.cnid}")

def _has_fs_deps(self, concrete_node_id: ConcreteNodeId):
node_of_interest : ConcreteNode = self.get_concrete_node(concrete_node_id)
Expand Down
10 changes: 5 additions & 5 deletions parallel-orch/scheduler_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ def __init__(self, socket_file, window):
def handle_init(self, input_cmd: str):
assert(input_cmd.startswith("Init"))
partial_order_file = input_cmd.split(":")[1].rstrip()
logging.debug(f'Scheduler: Received partial_order_file: {partial_order_file}')
util.debug_log(f'Scheduler: Received partial_order_file: {partial_order_file}')
self.partial_program_order = util.parse_partial_program_order_from_file(partial_order_file)
util.debug_log(str(self.partial_program_order.hsprog))

def handle_wait(self, input_cmd: str, connection):
concrete_node_id, env_file = self.__parse_wait(input_cmd)
self.waiting_for_response[concrete_node_id] = connection
logging.info(f'Scheduler: Received wait message - {concrete_node_id}.')
util.debug_log(f'Scheduler: Received wait message - {concrete_node_id}.')
self.latest_env = env_file
if self.partial_program_order.pre_handle_wait(concrete_node_id, env_file):
self.partial_program_order.handle_wait(concrete_node_id, env_file)
Expand All @@ -112,13 +112,13 @@ def process_next_cmd(self):
connection.close()
self.handle_init(input_cmd)
elif (input_cmd.startswith("Daemon Start") or input_cmd == ""):
logging.info(f'Scheduler: Received daemon start message.')
util.debug_log(f'Scheduler: Received daemon start message.')
connection.close()
elif (input_cmd.startswith("CommandExecComplete:")):
node_id, exec_id, sandbox_dir, trace_file = self.__parse_command_exec_x(input_cmd)
connection.close()
if self.partial_program_order.get_concrete_node(node_id).exec_id == exec_id:
logging.info(f'Scheduler: Received command exec complete message - {node_id}.')
util.debug_log(f'Scheduler: Received command exec complete message - {node_id}.')
self.partial_program_order.handle_complete(node_id, node_id in self.waiting_for_response, self.latest_env)

if self.partial_program_order.get_concrete_node(node_id).is_committed():
Expand All @@ -128,7 +128,7 @@ def process_next_cmd(self):
self.partial_program_order.finish_wait_unsafe(node_id, self.latest_env)
self.respond_to_wait_on_unsafe(node_id)
else:
logging.info(f'Scheduler: Received command exec complete message for a killed instance, ignoring - {node_id}.')
util.debug_log(f'Scheduler: Received command exec complete message for a killed instance, ignoring - {node_id}.')
elif (input_cmd.startswith("Wait")):
self.handle_wait(input_cmd, connection)
elif (input_cmd.startswith("Done")):
Expand Down
14 changes: 9 additions & 5 deletions parallel-orch/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
def debug_log(s):
logging.debug(DEBUG_LOG + s)

def overhead_log(s):
# logging.debug(DEBUG_LOG + s)
pass

def ptempfile(prefix=''):
fd, name = tempfile.mkstemp(dir=config.PASH_SPEC_TMP_PREFIX, prefix=prefix+'_')
## TODO: Get a name without opening the fd too if possible
Expand Down Expand Up @@ -298,18 +302,18 @@ def parse_partial_program_order_from_file(file_path: str):
loop_context_end = number_of_nodes + loop_context_start
loop_context_lines = lines[loop_context_start:loop_context_end]
loop_contexts = parse_loop_contexts(loop_context_lines)
logging.debug(f'Loop contexts: {loop_contexts}')
debug_log(f'Loop contexts: {loop_contexts}')

var_assignment_lines = int(lines[loop_context_end])
var_assignment_start = loop_context_end + 1
var_assignment_end = var_assignment_start + var_assignment_lines
var_assignment_lines = lines[var_assignment_start:var_assignment_end]
var_assignments = parse_var_assignment_lines(var_assignment_lines)
logging.debug(f'Var assignments: {var_assignments}')
debug_log(f'Var assignments: {var_assignments}')

## The rest of the lines are edge_lines
edge_lines = lines[var_assignment_end:]
logging.debug(f'Edges: {edge_lines}')
debug_log(f'Edges: {edge_lines}')

ab_nodes = {}
for i in range(number_of_nodes):
Expand All @@ -330,8 +334,8 @@ def parse_partial_program_order_from_file(file_path: str):
from_id, to_id = parse_edge_line(edge_line)
edges[NodeId(from_id)].append(NodeId(to_id))

logging.info(f"Nodes|{','.join([str(node) for node in ab_nodes])}")
logging.info(f"Edges|{edges}")
debug_log(f"Nodes|{','.join([str(node) for node in ab_nodes])}")
debug_log(f"Edges|{edges}")
return PartialProgramOrder(ab_nodes, edges, hs_prog)

def generate_id() -> int:
Expand Down
4 changes: 2 additions & 2 deletions pash-spec.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ sudo bash -c "echo $protected_mem > /sys/fs/cgroup/frontier/memory.min"

if [ -n "$PASH_TMP_DIR" ]; then
mkdir -p $PASH_TMP_DIR/tmp/pash_spec
echo $PASH_TMP_DIR >&2
# echo $PASH_TMP_DIR >&2
export PASH_SPEC_TMP_PREFIX="$(mktemp -d "$PASH_TMP_DIR/tmp/pash_spec/pash_XXXXXXX")"
else
mkdir -p /tmp/pash_spec
export PASH_SPEC_TMP_PREFIX="$(mktemp -d /tmp/pash_spec/pash_XXXXXXX)"
fi

echo "PASH_SPEC_TMP_PREFIX: $PASH_SPEC_TMP_PREFIX" >&2
# echo "PASH_SPEC_TMP_PREFIX: $PASH_SPEC_TMP_PREFIX" >&2

## Initialize the scheduler-server
export PASH_SPEC_SCHEDULER_SOCKET="${PASH_SPEC_TMP_PREFIX}/scheduler_socket"
Expand Down