Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions overlay-sandbox/commit-sandbox.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash

export SANDBOX_DIR=${1?No sandbox dir given}

## Assumes that the sandbox dir is called upperdir
export upperdir=upperdir
## Note: We are ignoring changes in the rikerfiles
ignore_patterns="-e .rkr -e Rikerfile"
echo "Ignoring changes in: $ignore_patterns"
changed_files=`find ${SANDBOX_DIR}/${upperdir}/* -type f | grep -v ${ignore_patterns}`

if [ ! -z "$changed_files" ]; then
echo "Changes detected in the following files:"
echo "$changed_files"

# commit fails in directories the user does not has access
# even though it ran successfully in unshare
# attempt to copy each changed file in the current working directory
while IFS= read -r changed_file; do
# echo "Attempting to copy: $changed_file"
# echo " to ${changed_file#$SANDBOX_DIR/$upperdir}"
## TODO: Add error handling if cp failed
cp "$changed_file" "${changed_file#$SANDBOX_DIR/$upperdir}"
if [ $? -ne 0 ]; then
echo "Error: Failed to copy $changed_file"
exit 1
fi
done <<< "$changed_files"
echo "Changes commited"
else
echo "No changes detected"
fi
8 changes: 3 additions & 5 deletions overlay-sandbox/run-sandboxed.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ export script_to_execute=${1?Script to execute not given}
## Find the source code top directory
export PASH_SPEC_TOP=${PASH_SPEC_TOP:-$(git rev-parse --show-toplevel --show-superproject-working-tree)}

## Generate a temporary directory to store the workfiles
mkdir -p /tmp/pash_spec
export SANDBOX_DIR="$(mktemp -d /tmp/pash_spec/sandbox_XXXXXXX)/"

export start_dir="$PWD"

echo "Overlay directory: ${SANDBOX_DIR}"
Expand Down Expand Up @@ -49,4 +45,6 @@ unshare --mount --map-root-user --user --pid --fork "${PASH_SPEC_TOP}/overlay-sa
## the sandbox (the first command in the workset)
## (2) the writeset of the main command (actually if they are
## the same, we could just make sure to commit them in order).
"${PASH_SPEC_TOP}/overlay-sandbox/check_changes_in_overlay.sh" "${SANDBOX_DIR}"

# We now commit at a later stage
# "${PASH_SPEC_TOP}/overlay-sandbox/commit-sandbox.sh" "${SANDBOX_DIR}"
16 changes: 14 additions & 2 deletions parallel-orch/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,19 @@ def async_run_and_trace_command_in_sandbox(command, trace_file):
process = async_run_and_trace_command(command, trace_file, sandbox_mode=True)
return process

def commit_workspace(workspace_path):
## Call commit-sandbox.sh to commit the uncommitted sandbox to the main workspace
run_script = f'{config.PASH_SPEC_TOP}/overlay-sandbox/commit-sandbox.sh'
args = ["/bin/bash", run_script, workspace_path]
process = subprocess.check_output(args)
return process

## Read trace and capture each command
def read_trace(trace_file):
with open(trace_file) as f:
def read_trace(sandbox_dir, trace_file):
if sandbox_dir == "":
path = trace_file
else:
path = f"{sandbox_dir}upperdir/{trace_file}"

with open(path) as f:
return f.readlines()
70 changes: 28 additions & 42 deletions parallel-orch/partial_program_order.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def __init__(self, nodes, edges):
self.commands_currently_executing = {}
self.to_be_resolved = {}
self.waiting_to_be_resolved = set()
## Contains the most recent sandbox directory paths
self.sandbox_dirs = {}

def __str__(self):
return f"NODES: {len(self.nodes.keys())} | ADJACENCY: {self.adjacency}"
Expand Down Expand Up @@ -155,17 +157,6 @@ def get_transitive_closure_if_can_be_resolved(self, can_be_resolved: list, targe
next_work.extend(new_next)
return list(all_next_transitive)

# def get_inverse_transitive_closure_if_can_be_resolved(self, can_be_resolved: list, target_node_ids: list) -> list:
# all_next_transitive = set(target_node_ids)
# next_work = target_node_ids.copy()
# while len(next_work) > 0:
# node_id = next_work.pop()
# successors = {next_node_id for next_node_id in self.get_next(node_id) if next_node_id in can_be_resolved}
# new_next = successors - all_next_transitive
# all_next_transitive = all_next_transitive.union(successors)
# next_work.extend(new_next)
# return list(all_next_transitive)

def is_frontier(self, node_id: int) -> bool:
return node_id in self.frontier

Expand Down Expand Up @@ -229,6 +220,7 @@ def find_cmds_to_resolve(self, cmd_ids_to_check: list):
## Forward dependency is when a command's output is the same
## as the input of a following command
def resolve_dependencies_continuous(self, new_node_id):
self.log_partial_program_order_info()
# We want to check every single command that has already finished executing but
# not yet able to be resolved
logging.debug("Finding sets of commands that can be resolved after {new_node_id} finished executing")
Expand All @@ -241,10 +233,9 @@ def resolve_dependencies_continuous(self, new_node_id):
# do nothing and wait until a new command finishes executing
if len(cmds_to_resolve) == 0:
logging.debug("No resolvable nodes were found in this round, nothing will change...")
return
return []

# Init stuff
independent_cmds_this_cycle = set(cmds_to_resolve)
new_workset = set()
old_workset = self.workset.copy()
logging.debug(" --- Starting dependency resolution --- ")
Expand All @@ -263,12 +254,13 @@ def resolve_dependencies_continuous(self, new_node_id):
elif self.has_backward_dependency(first_cmd_id, second_cmd_id):
logging.debug(f' > Command {second_cmd_id} was added to the workset, due to a backward dependency with {first_cmd_id}')
new_workset.add(second_cmd_id)
elif self.has_write_dependency(first_cmd_id, second_cmd_id):
logging.debug(f' > Command {second_cmd_id} was added to the workset, due to a write dependency with {first_cmd_id}')
new_workset.add(second_cmd_id)
elif self.has_forward_dependency(first_cmd_id, second_cmd_id):
logging.debug(f' > Command {second_cmd_id} was added to the workset, due to a forward dependency with {first_cmd_id}')
new_workset.add(second_cmd_id)
## No need to handle write dependencies anymore, as selective commit order solves this issue
# elif self.has_write_dependency(first_cmd_id, second_cmd_id):
# logging.debug(f' > Command {second_cmd_id} was added to the workset, due to a write dependency with {first_cmd_id}')
# new_workset.add(second_cmd_id)

logging.debug(" > Modifying speculated set accordingly")
old_speculated = self.speculated.copy()
Expand All @@ -281,24 +273,14 @@ def resolve_dependencies_continuous(self, new_node_id):

old_committed = self.committed.copy()
old_frontier = self.frontier.copy()
logging.debug(self.workset)
logging.debug(self.get_currently_executing())
logging.debug(self.frontier)

# TODO: ideally move this to the point
# we start executing a new command
self.step_forward(old_speculated, old_committed)

self.log_partial_program_order_info()
return self.committed - old_committed

logging.debug(f"Commands checked this cycle: {sorted(cmds_to_resolve)}")
logging.debug(f"Workset old: {old_workset}")
logging.debug(f"Workset new: {self.workset}")
logging.debug(f"Speculated old: {old_speculated}")
logging.debug(f"Speculated new: {self.speculated}")
logging.debug(f"Committed old: {old_committed}")
logging.debug(f"Committed new: {self.committed}")
logging.debug(f"Frontier old: {old_frontier}")
logging.debug(f"Frontier new: {self.frontier}")

def step_forward(self, old_speculated, old_committed):
logging.debug(" > Committing frontier")
Expand Down Expand Up @@ -398,28 +380,31 @@ def speculate_cmd_non_blocking(self, node_id: int):
logging.debug(f'Read trace from: {trace_file}')
self.commands_currently_executing[node_id] = (proc, trace_file)

def command_execution_completed(self, node_id: int):
def command_execution_completed(self, node_id: int, sandbox_dir: str):
self.sandbox_dirs[node_id] = sandbox_dir
_proc, trace_file = self.commands_currently_executing.pop(node_id)
trace_object = executor.read_trace(trace_file)
trace_object = executor.read_trace(sandbox_dir, trace_file)
read_set, write_set = trace.parse_and_gather_cmd_rw_sets(trace_object)
rw_set = RWSet(read_set, write_set)
self.update_rw_set(node_id, rw_set)
logging.debug(f" --- Node {node_id}, just finished execution ---")
self.resolve_dependencies_continuous(node_id)

def log_rw_sets(self, logging):
logging.debug("====== |RW Sets| ======")
for node_id, rw_set in self.rw_sets.items():
logging.debug(f"ID: {node_id}")
logging.debug(f"Read: {rw_set.get_read_set()}")
logging.debug(f"Write: {rw_set.get_write_set()}")
to_commit = self.resolve_dependencies_continuous(node_id)
self.commit_cmd_workspaces(to_commit)

def commit_cmd_workspaces(self, to_commit_ids):
logging.debug(len(to_commit_ids))
for cmd_id in to_commit_ids:
workspace = self.sandbox_dirs[cmd_id]
if workspace != "":
logging.debug(f" (!) Committing workspace of cmd {cmd_id} found in {workspace}")
executor.commit_workspace(workspace)
else:
logging.debug(f" (!) No need to commit workspace of cmd {cmd_id} as it was run in the main workspace")

def log_rw_sets(self):
logging.debug("====== |RW Sets| ======")
logging.debug("====== RW Sets " + "=" * 65)
for node_id, rw_set in self.rw_sets.items():
logging.debug(f"ID: {node_id}")
logging.debug(f"Read: {len(rw_set.get_read_set()) if rw_set is not None else None}")
logging.debug(f"Write: {len(rw_set.get_write_set()) if rw_set is not None else None}")
logging.debug(f"ID:{node_id} | R:{len(rw_set.get_read_set()) if rw_set is not None else None} | W:{len(rw_set.get_write_set()) if rw_set is not None else None}")

def log_partial_program_order_info(self):
logging.debug(f"=" * 80)
Expand All @@ -430,6 +415,7 @@ def log_partial_program_order_info(self):
logging.debug(f"EXECUTING: {list(self.commands_currently_executing.keys())}")
logging.debug(f"WAITING: {sorted(list(self.waiting_to_be_resolved))}")
logging.debug(f"TO RESOLVE: {self.to_be_resolved}")
self.log_rw_sets()
logging.debug(f"=" * 80)

def populate_to_be_resolved_dict(self, old_committed):
Expand Down
10 changes: 7 additions & 3 deletions parallel-orch/run_command.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ fi
echo "Execution mode: $EXEC_MODE"

if [ $sandbox_flag -eq 1 ]; then
"${PASH_SPEC_TOP}/overlay-sandbox/run-sandboxed.sh" "${PASH_SPEC_TOP}/parallel-orch/template_script_to_execute_in_overlay.sh"
## Generate a temporary directory to store the workfiles
echo "In sandbox mode"
mkdir -p /tmp/pash_spec
export SANDBOX_DIR="$(mktemp -d /tmp/pash_spec/sandbox_XXXXXXX)/"
"${PASH_SPEC_TOP}/overlay-sandbox/run-sandboxed.sh" "${PASH_SPEC_TOP}/parallel-orch/template_script_to_execute_in_overlay.sh" "${SANDBOX_DIR}"
else
echo "In standard mode"
export SANDBOX_DIR=""
"${PASH_SPEC_TOP}/parallel-orch/template_script_to_execute_in_overlay.sh"
fi

Expand All @@ -31,6 +36,5 @@ fi

## TODO: Pass the proper exit code
exit_code=0
msg="CommandExecComplete:${CMD_ID}|Exit code:${exit_code}"
msg="CommandExecComplete:${CMD_ID}|Exit code:${exit_code}|Sandbox dir:${SANDBOX_DIR}"
daemon_response=$(pash_spec_communicate_scheduler_just_send "$msg") # Blocking step, daemon will not send response until it's safe to continue

10 changes: 6 additions & 4 deletions parallel-orch/scheduler_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import signal
import time

from util import *
from partial_program_order import parse_partial_program_order_from_file
Expand Down Expand Up @@ -108,7 +109,8 @@ def __parse_command_exec_complete(self, input_cmd: str) -> "tuple[int, int]":
components = input_cmd.rstrip().split("|")
command_id = int(components[0].split(":")[1])
exit_code = int(components[1].split(":")[1])
return command_id, exit_code
sandbox_dir = components[2].split(":")[1]
return command_id, exit_code, sandbox_dir
except:
raise Exception(f'Parsing failure for line: {input_cmd}')

Expand All @@ -123,10 +125,11 @@ def handle_command_exec_complete(self, input_cmd: str):
logging.debug(f'Command exec complete: {input_cmd}')

## Read the node id from the command argument
cmd_id, exit_code = self.__parse_command_exec_complete(input_cmd)
cmd_id, exit_code, sandbox_dir = self.__parse_command_exec_complete(input_cmd)


## Gather RWset, resolve dependencies, and progress graph
self.partial_program_order.command_execution_completed(cmd_id)
self.partial_program_order.command_execution_completed(cmd_id, sandbox_dir)

# self.partial_program_order.log_partial_program_order_info()

Expand Down Expand Up @@ -195,7 +198,6 @@ def run(self):
# TODO: ec checks fail for now
if len(self.partial_program_order.workset) == 0:
self.done = True


self.socket.close()
shutdown()
Expand Down
4 changes: 4 additions & 0 deletions test/sleep_and_cat.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

sleep $1
cat $2 > $3
38 changes: 26 additions & 12 deletions test/test_orch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export WORKING_DIR="$ORCH_TOP/test/output_orch"
echo "${WORKING_DIR}"

bash="bash"
orch="$ORCH_TOP/pash-spec.sh"
orch="$ORCH_TOP/pash-spec.sh -d 100"

test_dir_orch="$ORCH_TOP/test/test_scripts_orch"
test_dir_bash="$ORCH_TOP/test/test_scripts_bash"
Expand Down Expand Up @@ -49,9 +49,9 @@ run_test()
diff -q "$output_dir_orch/" "$output_dir_bash/"
test_diff_ec=$?

## Check if the two exit codes are both success or both error
{ [ $test_bash_ec -eq 0 ] && [ $test_pash_ec -eq 0 ]; } || { [ $test_bash_ec -ne 0 ] && [ $test_pash_ec -ne 0 ]; }
test_ec=$?
# ## Check if the two exit codes are both success or both error
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO that we comment this out until we actually fix exit code handling in the system

# { [ $test_bash_ec -eq 0 ] && [ $test_pash_ec -eq 0 ]; } || { [ $test_bash_ec -ne 0 ] && [ $test_pash_ec -ne 0 ]; }
# test_ec=$?

if [ $test_diff_ec -ne 0 ]; then
echo -n "$test output mismatch "
Expand Down Expand Up @@ -98,15 +98,26 @@ test3()
$shell $2/semi_dependent_greps.sh
}


## TODO: Should fail at the moment
test4()
{
local shell=$1
echo $'foo\nbar\nbaz\nqux\nquux\nfoo\nbar' > $3/in1
echo $'foo\nbar\nbaz\nqux\nquux\nfoo\nbar' > $3/in2
echo $'foo\nbar\nbaz\nqux\nquux\nfoo\nbar' > $3/in3
$shell $2/semi_dependent_greps_2.sh
echo 'hello1' > $3/in1
echo 'hello2' > $3/in2
$shell $2/test4.sh
}

test5()
{
local shell=$1
echo 'hello1' > $3/in1
echo 'hello2' > $3/in2
$shell $2/test5.sh
}

test6()
{
local shell=$1
$shell $2/test6.sh
}

# We run all tests composed with && to exit on the first that fails
Expand All @@ -118,8 +129,11 @@ if [ "$#" -eq 0 ]; then
cleanup
run_test test3
cleanup
## TODO: Fails at the moment, uncomment when fixed
# run_test test4
run_test test4
cleanup
run_test test5
cleanup
run_test test6
else
for testname in $@
do
Expand Down
3 changes: 3 additions & 0 deletions test/test_scripts_bash/test4.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
./sleep_and_cat.sh 2 ./output_bash/in1 ./output_bash/out1
cat ./output_bash/out1 > ./output_bash/out2
./sleep_and_cat.sh 1 ./output_bash/in2 ./output_bash/out1
3 changes: 3 additions & 0 deletions test/test_scripts_bash/test5.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
./sleep_and_cat.sh 2 ./output_bash/in1 ./output_bash/out1
cat ./output_bash/out1 > ./output_bash/out2
./sleep_and_cat.sh 1 ./output_bash/in2 ./output_bash/in1
10 changes: 10 additions & 0 deletions test/test_scripts_bash/test6.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
echo "hello0" > ./output_bash/out0
echo "hello1" > ./output_bash/out1
echo "hello2" > ./output_bash/out2
echo "hello3" > ./output_bash/out3
echo "hello4" > ./output_bash/out4
echo "hello5" > ./output_bash/out5
echo "hello6" > ./output_bash/out6
echo "hello7" > ./output_bash/out7
echo "hello8" > ./output_bash/out8
echo "hello9" > ./output_bash/out9
3 changes: 3 additions & 0 deletions test/test_scripts_orch/test4.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
./sleep_and_cat.sh 2 ./output_orch/in1 ./output_orch/out1
cat ./output_orch/out1 > ./output_orch/out2
./sleep_and_cat.sh 1 ./output_orch/in2 ./output_orch/out1
3 changes: 3 additions & 0 deletions test/test_scripts_orch/test5.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
./sleep_and_cat.sh 2 ./output_orch/in1 ./output_orch/out1
cat ./output_orch/out1 > ./output_orch/out2
./sleep_and_cat.sh 1 ./output_orch/in2 ./output_orch/in1
Loading