Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions overlay-sandbox/check_changes_in_overlay.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,21 @@ export SANDBOX_DIR=${1?No sandbox dir given}

## Assumes that the sandbox dir is called upperdir
export upperdir=upperdir
changed_files=`find ${SANDBOX_DIR}/${upperdir}/* -type f`
## Note: We are ignoring changes in the rikerfiles
ignore_patterns="-e .rkr -e Rikerfile"
echo "Ignoring changes in: $ignore_patterns"
changed_files=`find ${SANDBOX_DIR}/${upperdir}/* -type f | grep -v ${ignore_patterns}`

if [ ! -z "$changed_files" ]; then
echo "Changes detected in the following files:"
echo "$changed_files"
echo -n "Commit changes? [y/n]: "
read commit
# echo -n "Commit changes? [y/n]: "

## TODO: Hardcoding always commit for now. Later
## we need to include this logic in the prototype.
# read commit
commit="y"

# commit fails in directories the user does not has access
# even though it ran successfully in unshare
if [ "$commit" == "y" ]; then
Expand Down
6 changes: 4 additions & 2 deletions overlay-sandbox/mount-and-execute.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
export SANDBOX_DIR=${1?No sandbox dir given}
export script_to_execute=${2?No script is given to execute}

ls / | xargs -I '{}' mount -t overlay overlay -o lowerdir=/'{}',upperdir="$SANDBOX_DIR"/upperdir/'{}',workdir="$SANDBOX_DIR"/workdir/'{}' "$SANDBOX_DIR"/temproot/'{}'
## TODO: Figure out if we need these directories and if we do, we need to get them some other way than the overlay
ignore_directories="-e proc -e dev -e proj -e run -e sys"
ls / | grep -v ${ignore_directories} | xargs -I '{}' mount -t overlay overlay -o lowerdir=/'{}',upperdir="$SANDBOX_DIR"/upperdir/'{}',workdir="$SANDBOX_DIR"/workdir/'{}' "$SANDBOX_DIR"/temproot/'{}'

# TODO: use unshare instead of chroot
# Alternatively, have a look at this
# NOTE: installed version of unshare does not support --root option
chroot "$SANDBOX_DIR/temproot" /bin/bash -c "cd $start_dir && source ${script_to_execute}"
chroot "$SANDBOX_DIR/temproot" /bin/bash -c "mount -t proc proc /proc && cd $start_dir && source ${script_to_execute}"

19 changes: 17 additions & 2 deletions overlay-sandbox/run-sandboxed.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ ls / | xargs -I '{}' mkdir "${SANDBOX_DIR}"/temproot/'{}' "${SANDBOX_DIR}"/workd
# the newly created user namespace.
# --user: The process will have a distinct set of UIDs, GIDs and
# capabilities.
unshare --mount --map-root-user --user "${PASH_SPEC_TOP}/overlay-sandbox/mount-and-execute.sh" "${SANDBOX_DIR}" "${script_to_execute}"

# --pid: Create a new process namespace
# KK: As far as I understand this is necessary so that
# procfs can be mounted and internal commands see it properly.
# --fork: Seems necessary if we do --pid
# KK: Not sure why!
unshare --mount --map-root-user --user --pid --fork "${PASH_SPEC_TOP}/overlay-sandbox/mount-and-execute.sh" "${SANDBOX_DIR}" "${script_to_execute}"

## TODO: Instead of running this code here always, we need to run
## it inside orch.py and essentially find the W dependencies
## of each command (which should also in theory be given by Riker).
##
## Then, we need to commit changes only if these write dependencies are
## independent from:
## (1) the readset of the main command that was run without
## the sandbox (the first command in the workset)
## (2) the writeset of the main command (actually if they are
## the same, we could just make sure to commit them in order).
"${PASH_SPEC_TOP}/overlay-sandbox/check_changes_in_overlay.sh" "${SANDBOX_DIR}"
18 changes: 18 additions & 0 deletions parallel-orch/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os
import subprocess

##
## Contains global configuration information and later arguments and other global
## things should be moved here.
##

GIT_TOP_CMD = [ 'git', 'rev-parse', '--show-toplevel', '--show-superproject-working-tree']
if 'PASH_SPEC_TOP' in os.environ:
PASH_SPEC_TOP = os.environ['PASH_SPEC_TOP']
else:
PASH_SPEC_TOP = subprocess.run(GIT_TOP_CMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True).stdout.rstrip()


## Ensure that PASH_TMP_PREFIX is set by pa.sh
assert(not os.getenv('PASH_SPEC_TMP_PREFIX') is None)
PASH_SPEC_TMP_PREFIX = os.getenv('PASH_SPEC_TMP_PREFIX')
37 changes: 37 additions & 0 deletions parallel-orch/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import config
import subprocess

# This module executes a sequence of commands
# and traces them with Riker.
# TODO: isolate the execution of the [1:N] commands
# with overlay.


## TODO: Modify this function to just run one command
def async_run_and_trace_command(command, trace_file, sandbox_mode=False):
## Call Riker to execute the command
run_script = f'{config.PASH_SPEC_TOP}/parallel-orch/run_command.sh'
args = ["/bin/bash", run_script, command, trace_file]
if sandbox_mode:
args.append("sandbox")
else:
print(" -- Standard mode")
args.append("standard")
process = subprocess.Popen(args, stdout=subprocess.DEVNULL)
return process

def async_run_and_trace_command_in_sandbox(command, trace_file):
## TODO: Run all the following in a sandbox
process = async_run_and_trace_command(command, trace_file, sandbox_mode=True)
return process

## Write a Rikerfile with these commands to execute them
def write_cmds_to_rikerfile(cmds_to_run):
with open("Rikerfile", "w") as f:
for cmd in cmds_to_run:
f.write(cmd + "\n")

## Read trace and capture each command
def read_trace(trace_file):
with open(trace_file) as f:
return f.readlines()
160 changes: 141 additions & 19 deletions parallel-orch/orch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from argparse import ArgumentParser
import sys
import logging
import tracer
import executor
import trace
import util

# TODO: Currently cmd_execution_info does not create correct r/w sets for
# commands with same first part but different redir.
Expand Down Expand Up @@ -77,6 +78,36 @@ def log_simplified(self):
logging.debug(f"W:{[ref_name for ref_name in self.write_set if ref_name in file_name_pool]}")
logging.debug(f"COMMITED:{self.commited}\n")


## Currently this abstracts a list of cmds
##
## In the future we will modify it to be a partial order
class Workset:
def __init__(self, list_of_cmds):
self.list_of_cmds = list_of_cmds

def __len__(self):
return len(self.list_of_cmds)

def __iter__(self):
return iter(self.list_of_cmds)

def get_first(self):
return self.list_of_cmds[0]

def get_rest(self):
return self.list_of_cmds[1:]

def insert_at_end(self, cmd_id):
self.list_of_cmds.append(cmd_id)

def get_all_enumerate(self):
return enumerate(self.list_of_cmds)

## Needs to be called after get_all_enumerate
def get_suffix(self, i):
return self.list_of_cmds[i+1:]

## cmd_execution_info is a dictionary containing information about each command.
## id : Cmd_exec_info (id, command, read set, write set, is commited)
def generate_cmd_execution_info(cmds_to_run):
Expand Down Expand Up @@ -140,29 +171,53 @@ def add_launch_assignments_to_rw_sets(cmd_execution_info, trace_object):
cmd_execution_info[launch_name].add_to_write_set(trace.get_path_ref_name(path_ref))
return cmd_execution_info



def has_forward_dependency(cmd_execution_info, first, second):
print(cmd_execution_info)
first_write_set = cmd_execution_info[first].write_set
second_read_set = cmd_execution_info[second].read_set
# We want the write set of the first command to not have
# common elements with the second command,
# common elements with the read set of the second command,
# otherwise the second is forward-dependent
return not first_write_set.isdisjoint(second_read_set)

def has_backward_dependency(cmd_execution_info, first, second):
first_write_set = cmd_execution_info[first].read_set
second_read_set = cmd_execution_info[second].write_set
# We want the read set of the first command to not have
# common elements with the write set of the second command,
# otherwise the second is forward-dependent
return not first_write_set.isdisjoint(second_read_set)

def has_write_dependency(cmd_execution_info, first, second):
first_write_set = cmd_execution_info[first].write_set
second_read_set = cmd_execution_info[second].write_set
# We want the write set of the first command to not have
# common elements with the write set of the second command,
# otherwise the second is write-dependent
return not first_write_set.isdisjoint(second_read_set)

## Resolve all the forward dependencies and update the workset
## Forward dependency is when a command's output is the same
## as the input of a following command
def check_forward_dependencies(cmd_execution_info, workset):
new_workset = []
for i, cmd_id in enumerate(workset):
for dependent_cmd_id in workset[i+1:]:
def check_dependencies(cmd_execution_info, workset):
new_workset = Workset([])
for i, first_cmd_id in workset.get_all_enumerate():
for second_cmd_id in workset.get_suffix(i):
# TODO: Optimization, maybe we could not run
# Configurable
# Priorities
if dependent_cmd_id not in new_workset and \
has_forward_dependency(cmd_execution_info, cmd_id, dependent_cmd_id):
new_workset.append(dependent_cmd_id)
if second_cmd_id not in new_workset and \
has_forward_dependency(cmd_execution_info, first_cmd_id, second_cmd_id):
new_workset.insert_at_end(second_cmd_id)
logging.debug(f"Forward dependency: {first_cmd_id}, {second_cmd_id}")
elif second_cmd_id not in new_workset and \
has_backward_dependency(cmd_execution_info, first_cmd_id, second_cmd_id):
new_workset.insert_at_end(second_cmd_id)
logging.debug(f"Backward dependency: {first_cmd_id}, {second_cmd_id}")
elif second_cmd_id not in new_workset and \
has_write_dependency(cmd_execution_info, first_cmd_id, second_cmd_id):
new_workset.insert_at_end(second_cmd_id)
logging.debug(f"Write dependency: {first_cmd_id}, {second_cmd_id}")
return new_workset

def workset_cmds_to_list(cmd_execution_info):
Expand All @@ -184,15 +239,85 @@ def execute_workset_and_find_rw_dependencies(cmd_execution_info, workset):
## Warning! HACK: Remove these functions in later iteration
## cmd_execution_info is converted to cmd-based dict (instead of id)
cmd_exec_info_cmd_based_dict = convert_cmd_exec_info_to_cmd_based_dict(cmd_execution_info)


trace_objects = run_and_trace_workset(workset, cmd_execution_info)

## TODO: Fix the rest of code to work with a trace dictionary
## from command ids to trace objects

## HACK: Just to make tests run for now we concatenate all traces into a big trace
## to just run tests and code as it was.
## The good thing is that now we have a trace for each cmd_id
## and therefore we can parse dependencies even easier and better!
trace_object = []
for cmd_id in sorted(trace_objects.keys()):
trace_obj = trace_objects[cmd_id]
trace_object += trace_obj

## HACK: Convert workset from id list to cmd list. Same as above
cmd_workset = [cmd_execution_info[cmd_id].cmd for cmd_id in workset]
trace = tracer.run_and_trace_workset(cmd_workset, OUTPUT_TRACE_FILE)
# Changes are made on the cmd-based structures
cmd_exec_info_cmd_based_dict = extract_rw_sets_from_trace(cmd_exec_info_cmd_based_dict, cmd_workset, trace)
cmd_exec_info_cmd_based_dict = extract_rw_sets_from_trace(cmd_exec_info_cmd_based_dict, cmd_workset, trace_object)
## HACK: Remove in later iteration
## above conversion is reverted back to id based
return convert_cmd_exec_info_cmd_based_to_id_based_dict(cmd_exec_info_cmd_based_dict)

## TODO: In order to be able to combine forward and backward dependencies
## we need to execute all except the first cmd in a sandbox (and riker in the sandbox)
##
## We need to change this function to write the first cmd in a rikerfile
## and then iterate on all others, run them in a sandbox and then put
## them in a rikerfile there, and run them there.
##
## It is likely that this then requires work on the traces, modifying them
## to be correct for the orch.
##
## The other big thing is to then decide whether to commit each of the sandboxes
## or not. If there is ANY dependency we want to not commit the sandbox.
##
## NOTE: There are two different types of commits, the sandbox commit,
## which just means execute the command and see its effects,
## and the orchestrator commit, which means that this command
## has completed and will never run again (and all its prefix has also completed).
def run_and_trace_workset(workset, cmd_execution_info):
cmd_procs_and_trace_files = {}

## Get the first command in the workset and run it just with riker
first_cmd_id = workset.get_first()
first_cmd = cmd_execution_info[first_cmd_id].cmd

## Launch all commands to run and be traced
first_command_trace_file = util.ptempfile()
print("First command:", first_cmd, "trace will be saved in:", first_command_trace_file)
process = executor.async_run_and_trace_command(first_cmd, first_command_trace_file)
cmd_procs_and_trace_files[first_cmd_id] = (process, first_command_trace_file)

for cmd_id in workset.get_rest():
cmd = cmd_execution_info[cmd_id].cmd
trace_file = util.ptempfile()
print("Command:", cmd, "trace will be saved in:", trace_file)
process = executor.async_run_and_trace_command_in_sandbox(cmd, trace_file)
cmd_procs_and_trace_files[cmd_id] = (process, trace_file)


## Wait for all processes to be done executing
for cmd_id in sorted(cmd_procs_and_trace_files.keys()):
p, _file = cmd_procs_and_trace_files[cmd_id]
## TODO: Figure out a way to wait on any process
## and not wait on them in sequence as we do now
p.wait()

## Gather all traces
trace_objects = {}
for cmd_id, proc_and_trace_file in cmd_procs_and_trace_files.items():
_proc, trace_file = proc_and_trace_file
trace_object = executor.read_trace(trace_file)
trace_objects[cmd_id] = trace_object

## Returns a dictionary of traces, one for each command id
return trace_objects

def scheduling_algorithm(cmds_to_run):
## create initial Cmd_exec_info objects for each parsed cmd
## TODO: this implementation does not allow duplicate commands in the workset, change it.
Expand All @@ -202,7 +327,7 @@ def scheduling_algorithm(cmds_to_run):
# cmd_to_id = generate_cmd_to_id(cmd_execution_info)

# The workset contains all the command ids that are going to be traced in the current cycle
workset = [cmd.id for cmd in cmd_execution_info.values()]
workset = Workset([cmd.id for cmd in cmd_execution_info.values()])
# Count tracing cycles
reps = 1
## Parse trace
Expand All @@ -214,7 +339,7 @@ def scheduling_algorithm(cmds_to_run):
cmd_execution_info = execute_workset_and_find_rw_dependencies(cmd_execution_info, workset)
cmd_execution_info_simplified(cmd_execution_info)
# Check forward dependencies and update workset accordingly
workset = check_forward_dependencies(cmd_execution_info, workset)
workset = check_dependencies(cmd_execution_info, workset)
reps += 1

def main():
Expand All @@ -226,10 +351,7 @@ def main():

## Just work with files in this pool for now
## TODO: Extend to work with all file references
file_name_pool = ["./output_orch/in1", "./output_orch/in2", "./output_orch/in3",
"./output_orch/in4", "./output_orch/in5", "./output_orch/in6" ,
"./output_orch/out1", "./output_orch/out2", "./output_orch/out3",
"./output_orch/out4", "./output_orch/out5", "./output_orch/out6"]
file_name_pool = ["./in1", "./out1", "out1", "in1"]

args = parse_args()

Expand Down
26 changes: 26 additions & 0 deletions parallel-orch/run_command.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash

## TODO: Not sure if it is OK and ideal to give this a string
export CMD_STRING=${1?No command was given to execute}
export TRACE_FILE=${2?No trace file path given}
export EXEC_MODE=${3?No execution mode given}

if [ "sandbox" == "$EXEC_MODE" ]; then
export sandbox_flag=1
elif [ "standard" == "$EXEC_MODE" ]; then
export sandbox_flag=0
else
echo "$$: Unknown value ${EXEC_MODE} for execution mode" 1>&2
exit 1
fi

echo "Execution mode: $EXEC_MODE"

if [ $sandbox_flag -eq 1 ]; then
"${PASH_SPEC_TOP}/overlay-sandbox/run-sandboxed.sh" "${PASH_SPEC_TOP}/parallel-orch/template_script_to_execute_in_overlay.sh"
else
echo "In standard mode"
"${PASH_SPEC_TOP}/parallel-orch/template_script_to_execute_in_overlay.sh"
fi


Loading