Skip to content
Merged
12 changes: 9 additions & 3 deletions overlay-sandbox/commit-sandbox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ export SANDBOX_DIR=${1?No sandbox dir given}
## Assumes that the sandbox dir is called upperdir
export upperdir=upperdir
## Note: We are ignoring changes in the rikerfiles
ignore_patterns="-e .rkr -e Rikerfile"
ignore_patterns="-e .rkr -e Rikerfile -e swapfile"
echo "Ignoring changes in: $ignore_patterns"
changed_files=`find ${SANDBOX_DIR}/${upperdir}/* -type f | grep -v ${ignore_patterns}`
changed_files=`find ${SANDBOX_DIR}/${upperdir}/* -mindepth 2 | grep -v ${ignore_patterns}`

if [ ! -z "$changed_files" ]; then
echo "Changes detected in the following files:"
Expand All @@ -17,10 +17,16 @@ if [ ! -z "$changed_files" ]; then
# even though it ran successfully in unshare
# attempt to copy each changed file in the current working directory
while IFS= read -r changed_file; do
file_type=`file -b "$changed_file"`
if [ "$file_type" == "directory" ]; then
mkdir -p "$file_type" "${changed_file#$SANDBOX_DIR/$upperdir}"
else
cp "$changed_file" "${changed_file#$SANDBOX_DIR/$upperdir}"
fi
# echo "Attempting to copy: $changed_file"
# echo " to ${changed_file#$SANDBOX_DIR/$upperdir}"
## TODO: Add error handling if cp failed
cp "$changed_file" "${changed_file#$SANDBOX_DIR/$upperdir}"
# cp "$changed_file" "${changed_file#$SANDBOX_DIR/$upperdir}"
if [ $? -ne 0 ]; then
echo "Error: Failed to copy $changed_file"
exit 1
Expand Down
2 changes: 1 addition & 1 deletion parallel-orch/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def async_run_and_trace_command(command, trace_file, node_id, sandbox_mode=False
# print(" -- Standard mode")
args.append("standard")
args.append(str(node_id))
process = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# For debugging
# process = subprocess.Popen(args)
return process
Expand Down
18 changes: 14 additions & 4 deletions parallel-orch/partial_program_order.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

import executor
import trace
import sys

class Node:
def __init__(self, id, cmd):
Expand Down Expand Up @@ -405,7 +405,7 @@ def speculate_cmd_non_blocking(self, node_id: int):

def command_execution_completed(self, node_id: int, exit_code:int, sandbox_dir: str):
self.sandbox_dirs[node_id] = sandbox_dir
_proc, trace_file = self.commands_currently_executing.pop(node_id)
proc, trace_file = self.commands_currently_executing.pop(node_id)
# Handle stopped by riker due to network access
if int(exit_code) == 159:
logging.debug(f" Adding {node_id} to stopped")
Expand All @@ -417,21 +417,31 @@ def command_execution_completed(self, node_id: int, exit_code:int, sandbox_dir:
logging.debug(f" --- Node {node_id}, just finished execution ---")
to_commit = self.resolve_dependencies_continuous_and_move_frontier(node_id)
self.commit_cmd_workspaces(to_commit)
# FIXME: Not suitable for large outputs as it buffers the whole output
# Make it print in real time maybe
# https://stackoverflow.com/a/803421
self.print_cmd_out(proc)

def print_cmd_out(self, proc):
proc_stdout, proc_stderr = proc.communicate()
print(proc_stdout.decode())
print(proc_stderr.decode(), file=sys.stderr)

def commit_cmd_workspaces(self, to_commit_ids):
logging.debug(len(to_commit_ids))
for cmd_id in to_commit_ids:
workspace = self.sandbox_dirs[cmd_id]
if workspace != "":
logging.debug(f" (!) Committing workspace of cmd {cmd_id} found in {workspace}")
executor.commit_workspace(workspace)
commit_workspace_out = executor.commit_workspace(workspace)
logging.debug(commit_workspace_out.decode())
else:
logging.debug(f" (!) No need to commit workspace of cmd {cmd_id} as it was run in the main workspace")

def log_rw_sets(self):
logging.debug("====== RW Sets " + "=" * 65)
for node_id, rw_set in self.rw_sets.items():
logging.debug(f"ID:{node_id} | R:{len(rw_set.get_read_set()) if rw_set is not None else None} | W:{len(rw_set.get_write_set()) if rw_set is not None else None}")
logging.debug(f"ID:{node_id} | R:{len(rw_set.get_read_set()) if rw_set is not None else None} | W:{rw_set.get_write_set() if rw_set is not None else None}")

def log_partial_program_order_info(self):
logging.debug(f"=" * 80)
Expand Down
7 changes: 2 additions & 5 deletions parallel-orch/scheduler_server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import argparse
import logging
import signal
Expand Down Expand Up @@ -121,10 +120,10 @@ def handle_command_exec_complete(self, input_cmd: str):
assert(input_cmd.startswith("CommandExecComplete:"))
## Read the node id from the command argument
cmd_id, exit_code, sandbox_dir = self.__parse_command_exec_complete(input_cmd)
logging.debug(input_cmd)


## Gather RWset, resolve dependencies, and progress graph
self.partial_program_order.command_execution_completed(cmd_id, exit_code, sandbox_dir)
logging.debug(input_cmd)

## If there is a connection waiting for this node_id, respond to it
if cmd_id in self.waiting_for_response:
Expand Down Expand Up @@ -196,8 +195,6 @@ def run(self):
shutdown()




def shutdown():
## There may be races since this is called through the signal handling
logging.debug("PaSh-Spec scheduler is shutting down...")
Expand Down
206 changes: 177 additions & 29 deletions parallel-orch/trace.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,59 @@
import re
import sys
import os
from typing import Tuple

# Parse the Riker trace structure
#
# TODO: This module will need to contain the definition
# of the trace structure and its methods that we will use to parse it.
from enum import Enum
import logging

class Ref(Enum):

STDIN = sys.stdin
STDOUT = sys.stdout
STDERR = sys.stderr
ROOT = os.path.abspath(os.sep)
# Not sure this is always correct
# but it doesn't affect the results
CWD = os.getcwd()


class PathRef:

def __init__(self, ref, path, permissions, no_follow):
self.ref = ref
self.path = path
self.is_read, self.is_write = self.resolve_permissions(permissions)
self.is_nofollow = no_follow

def resolve_permissions(self, permissions: str):
if "r" in permissions:
is_read = True
else:
is_read = False
if "w" in permissions:
is_write = True
else:
is_write = False
return is_read, is_write

def __str__(self):
return f"PathRef({self.ref}, {self.path}, {'r' if self.is_read else '-'}{'w' if self.is_write else '-'} {'no follow' if self.is_nofollow else ''})"

def get_resolved_path(self):
if not self.is_nofollow:
return os.path.join(self.ref, self.path).replace("/./", "/")


def log_resolved_trace_items(resolved_dict):
for k, v in resolved_dict.items():
try:
logging.debug(f"{k}: {v.get_resolved_path()} {'r' if v.is_read else '-'}{'w' if v.is_write else '-'} {'no follow' if v.is_nofollow else ''}")
except:
logging.debug(f'{k}: {v}')

def remove_command_redir(cmd):
return cmd.split(">")[0].rstrip()

def remove_command_prefix(line):
def remove_command_prefix(line) -> str:
return line.split(f"]: ")[1].rstrip()

def get_command_prefix(line):
Expand All @@ -31,52 +75,156 @@ def get_path_ref_open_config(trace_item):
open_config = re.split('\(|\)', open_config_suffix)[0].rstrip()
return open_config

def is_path_ref_read(trace_item):
open_config = get_path_ref_open_config(trace_item)
return (open_config[0] == "r")
def get_path_ref_no_follow(trace_item):
return "nofollow" in trace_item

def is_path_ref_read(trace_item: PathRef):
return trace_item.is_read

def is_path_ref_write(trace_item):
open_config = get_path_ref_open_config(trace_item)
return (open_config[1] == "w")
def is_path_ref_write(trace_item: PathRef):
return trace_item.is_write

def is_path_ref_empty(trace_item: PathRef):
return not trace_item.is_read and not trace_item.is_write

def get_path_ref_name(trace_item):
assert(is_new_path_ref(trace_item))
open_config = trace_item.split(", ")[1].replace('"', '')
return open_config
return trace_item.split(", ")[1].replace('"', '')

def is_command_prefix(line):
if line.startswith(f"[Command"):
def get_path_ref_ref(trace_item):
assert(is_new_path_ref(trace_item))
return trace_item.split(", ")[0].split("(")[1]

def is_no_command_prefix(line):
if line.startswith(f"[No Command"):
return True
return False

def is_launch(line):
return "Launch(" in line

def get_launch_assignments(trace_item):
def parse_launch_command(trace_item):
assert(is_launch(trace_item))
assignment_suffix = ", ".join(trace_item.split(", ")[1:])
assignment_string = assignment_suffix[1:-2].split(",")
assignments = [(x.split("=")) for x in assignment_string]
return assignments

def parse_launch(trace_item):
assert(is_launch(trace_item))

def get_lauch_name(trace_item):
assert(is_launch(trace_item))
launch_name_dirty = trace_item.split("],")[0]
launch_name = launch_name_dirty.split("Command ")[1]
return launch_name
return launch_name

## Parse the trace object and gather rw sets for this command
def parse_and_gather_cmd_rw_sets(trace_object) -> Tuple[set, set]:
def get_no_command_ref_id(trace_item):
return trace_item.split("=")[0].strip()

relevant_trace_lines = [line for line in trace_object
if is_command_prefix(line)]
relevant_trace_items = [remove_command_prefix(line) for line in relevant_trace_lines]
def get_no_command_ref_ref(trace_item):
return trace_item.split("=")[1].strip()

new_path_ref_items = [item for item in relevant_trace_items if is_new_path_ref(item)]
def is_prefix_of_cmd(line, prefix):
if prefix is not None and prefix in get_command_prefix(line):
return True
return False

read_set = {get_path_ref_name(item) for item in new_path_ref_items
if is_path_ref_read(item)}
write_set = {get_path_ref_name(item) for item in new_path_ref_items
if is_path_ref_write(item)}
def parse_rw_sets(trace_object):
refs_dict = {}
# In the first iteration, we get the refs
for line in trace_object:
# This branch will always execute first
if is_no_command_prefix(line):
line = remove_command_prefix(line)
if " = " in line:
lhs_ref = int(get_no_command_ref_id(line).strip().lstrip("r"))
rhs_ref = get_no_command_ref_ref(line).strip().lstrip("r")
if rhs_ref == "CWD":
refs_dict[lhs_ref] = Ref.CWD
elif rhs_ref == "ROOT":
refs_dict[lhs_ref] = Ref.ROOT
elif rhs_ref == "STDERR":
refs_dict[lhs_ref] = Ref.STDERR
elif rhs_ref == "STDIN":
refs_dict[lhs_ref] = Ref.STDIN
elif rhs_ref == "STDOUT":
refs_dict[lhs_ref] = Ref.STDOUT
# Parses launch assignments
elif is_launch(line):
assignments = parse_launch_command(remove_command_prefix(line))
for assignment in assignments:
refs_dict[int(assignment[0].strip().lstrip("r"))] = refs_dict[int(assignment[1].strip().lstrip("r"))]
# Parses pathrefs
elif is_new_path_ref(line):
line = remove_command_prefix(line).strip()
lhs_ref = int(get_path_ref_id(line).strip().lstrip("r"))
ref = int(get_path_ref_ref(line).strip().lstrip("r"))
name = get_path_ref_name(line).strip()
open_config = get_path_ref_open_config(line).strip()
no_follow = get_path_ref_no_follow(line)
path_ref = PathRef(ref, name, open_config, no_follow)
refs_dict[lhs_ref] = path_ref
return refs_dict

def traverse_path_ref(refs_dict: dict, ref: PathRef):
if isinstance(ref, PathRef) and not ref.is_nofollow and isinstance(refs_dict[ref.ref], PathRef):
return traverse_path_ref(refs_dict, refs_dict[ref.ref])
else:
return ref.ref

def resolve_rw_set_refs(refs_dict):
for ref_id, ref in refs_dict.items():
if isinstance(ref, PathRef):
refs_dict[ref_id].ref = traverse_path_ref(refs_dict, ref)
return refs_dict

def replace_path_ref_terminal_nodes(refs_dict: dict):
for ref in refs_dict.values():
if isinstance(ref, PathRef) and not ref.is_nofollow:
# HACK: This is hard-coded stdout
if ref.ref not in refs_dict:
ref.ref = refs_dict[4].value
else:
ref.ref = refs_dict[ref.ref].value

return read_set, write_set
## Parse the trace object and gather rw sets for this command
def parse_and_gather_cmd_rw_sets(trace_object) -> Tuple[set, set]:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this refactoring goes toward the right direction but stops a step early :) It would make more sense to break the parse_and_gather to a parse and a gather. The parse takes a trace object and returns a dictionary of path refs (from their id r1, r2 to the actual path ref. Then, the gather_rw_sets takes this dictonary and traverses it completely to find all reads and writes of a command.

refs_dict = parse_rw_sets(trace_object)
resolved_dict = resolve_rw_set_refs(refs_dict)
replace_path_ref_terminal_nodes(resolved_dict)
log_resolved_trace_items(resolved_dict)

read_set = set()
write_set = []
dir_set = []
for i in range(len(resolved_dict)):
if i not in resolved_dict:
continue
resolved_trace_object = resolved_dict[i]
# We ignore Ref objects
if isinstance(resolved_trace_object, Ref):
continue
if is_path_ref_read(resolved_trace_object):
read_set.add(resolved_trace_object.get_resolved_path())
if is_path_ref_write(resolved_trace_object):
write_set.append(resolved_trace_object.get_resolved_path())
# This is a sign that a directory declaration might exist
if is_path_ref_empty(resolved_trace_object):
if i > 0:
previous_resolved_trace_object = resolved_dict[i-1]
if isinstance(previous_resolved_trace_object, PathRef) and is_path_ref_write(previous_resolved_trace_object):
dir_set.append(resolved_trace_object.get_resolved_path())
write_set.pop()

prefix = os.path.commonprefix(dir_set)
suffixes = [dir.replace(prefix, "") for dir in dir_set]
dir_string = ""
for dir in suffixes:
dir_string = os.path.join(dir_string, dir)
to_add = os.path.join(prefix, dir_string)
if to_add.endswith("/"):
write_set.append(to_add)
else:
write_set.append(to_add + "/")
return read_set, set(write_set)