diff --git a/README.md b/README.md index 17053c2e..db90fe82 100644 --- a/README.md +++ b/README.md @@ -1,66 +1,78 @@ -## Dynamic Parallelizer +## hs README -A dynamic parallelizer that optimistically/speculatively executes everything in a script in parallel and ensures that it executes correctly by tracing it and reexecuting the parts that were erroneous. +### Overview -## Installing +`hs` is a system for executing shell scripts out of order. It achieves this by tracing the script's execution, and if an error arises due to speculative execution, the script re-executes the necessary parts to ensure correct outcomes. The project aims to boost the parallel execution of shell scripts, reducing their runtime and enhancing efficiency. -```sh -./scripts/install_deps_ubuntu20.sh -``` +### Structure -## Tests +The project's top-level directory contains the following: -To run the tests: -```sh -cd test -./test_orch.sh -``` +- `deps`: Dependencies required by `hs`. +- `docs`: Documentation and architectural diagrams. +- `model-checking`: Tools and utilities for model checking. +- `parallel-orch`: Main orchestration components. +- `pash-spec.sh`: Entry script to initiate the `hs` process. +- `README.md`: This documentation file. +- `report`: Generated reports related to test runs and performance metrics. +- `requirements.txt`: List of Python dependencies. +- `Rikerfile`: Configuration file for Riker. -### TODO Items +### Installation -#### Complete control flow and complex script support +Install `hs` on your Linux-based machine by following these steps: -Extend the architecture to support complete scripts and not just partial order graphs of commands. +**Note:** Currently works with `Ubuntu 20.04` or later -A potential solution is shown below: +1. Navigate to the project directory: + ```sh + cd path_to/dynamic-parallelizer + ``` -![Architecture Diagram](/docs/handdrawn_architecture.jpeg) +2. Run the installation script: + ```sh + ./scripts/install_deps_ubuntu20.sh + ``` -This solution includes a preprocessor that creates two executable artifacts: -- the preprocessed/instrumented script (similar to what the PaSh-JIT preprocessor produces) -- the partial program order graph (a graph of commands that will be speculated and executed with tracing from the orchestrator) +This script will handle all the necessary installations, including dependencies, try, Riker, and PaSh. -The graph might contain unexpanded commands, so the orchestrator should support unexpanded strings. -On these commands, the orchestrator can speculate for the value of these strings and then when they become the frontier (the preprocessed script has reached them), we actually know their values and could confirm/abort the speculation. +### Running `hs` -The two executors communicate with each other and progress through the script execution in tandem. The JIT executor (left) also needs to trace execution to inform the orchestrator about changes in the environment. +The main entry script to initiate `hs` is `pash-spec.sh`. This script sets up the necessary environment and invokes the orchestrator in `parallel-orch/orch.py`. It's designed to accept a variety of arguments to customize its behavior, such as setting debug levels or specifying log files. -#### Orchestator: Partial Program Order Graph +Example of running the script: -**Note:** we have moved to a continuous scheduling implementation. An example explaining its operation can be found [here](/docs/example.md). +```bash +./pash-spec.sh [arguments] script_to_speculatively_run.sh +``` -The orchestrator needs to support arbitrary partial program order graphs (instead of just sequences of instructions), to figure out the precise real program order dependencies. +**Arguments**: +- `-d, --debug-level`: Set the debugging level. Default is `0`. +- `-f, --log_file`: Define the logging output file. By default, logs are printed to stdout. +- `--sandbox-killing`: Kill any running overlay instances before committing to the lower layer. +- `--env-check-all-nodes-on-wait`: On a wait, check for environment changes between the current node and all other waiting nodes. (not fully functional yet!) -An instance of a graph is shown below: +### Testing -![Example Partial Program Order Graph](/docs/handdrawn_partial_program_order.jpeg) +To run the provided tests: -One important characteristic of the graph (and the speculative execution algorithm) is that there is a committed prefix-closed part that has already executed and cannot be affected. -The rest of the graph is uncommited and therefore might or might not have completed execution. The uncommited frontier, the part of the graph adjacent to the prefix is guaranteed to execute and complete without speculation (since we have both the environment and the variables resolved) and this is part of the argument for the termination of the algorithm. Every step that the orchestration takes, it can always commit the uncommited frontier, and therefore the commited prefix grows until it reaches the whole graph. +```bash +./test/test_orch.sh +``` -#### Orchestrator: Backward dependencies and Execution Isolation/Aborting/Reverting +For in-depth analysis, set the `DEBUG` environment variable to `2` for detailed logs and redirect logs to a file: -How do we resolve backward dependencies? For example: -```sh -grep foo in1 > out1 -grep bar in0 > in1 ## Its write might affect the first command exec. +```bash +DEBUG=2 ./test/test_orch.sh 2>logs.txt ``` -One solution would be to run the non-frontier (non-root) commands in an isolated environment and only at the end of their execution commit their results. This might have significant overhead, except if we can just write to temporary files and then move them? Or let them work in a temporary directory? +### Contributing and Further Development + +Contributions are always welcome! The project roadmap includes extending the architecture to support complete scripts, optimizing the scheduler for better performance, etc. -Another way would be to dynamically track writes of non-frontier commands and stop them when they try to write to something that might be a read dependency of the first, but there are timing issues here that I don't see how to resolve. +For a detailed description of possible optimizations, see the [related issues](https://github.com/binpash/dynamic-parallelizer/issues?q=is%3Aopen+is%3Aissue+label%3Aoptimization) -#### Commands that change current directory +### License -Can we actually trace that and not run these commands? Is that simply a change of an environment variable? They will run in a forked version anyway, but we want to see their results. +`hs` is licensed under the MIT License. See the `LICENSE` file for more information. diff --git a/deps/pash b/deps/pash index 956064e3..9044f6db 160000 --- a/deps/pash +++ b/deps/pash @@ -1 +1 @@ -Subproject commit 956064e3fd50380538e0bc3f5dde4957c0d2a12c +Subproject commit 9044f6dbb79f2bd90a9076453932fe842ea8ba09 diff --git a/parallel-orch/analysis.py b/parallel-orch/analysis.py index 76408759..3620d28e 100644 --- a/parallel-orch/analysis.py +++ b/parallel-orch/analysis.py @@ -29,47 +29,67 @@ def parse_shell_to_asts(input_script_path) -> "list[AstNode]": except libdash.parser.ParsingException as e: logging.error(f'Parsing error: {e}') exit(1) + +def validate_node(ast) -> bool: + assert(isinstance(ast, (CommandNode, PipeNode))) + if isinstance(ast, CommandNode): + return True + else: + for cmd in ast.items: + assert isinstance(cmd, CommandNode) -## Returns true if the script is safe to speculate and execute outside -## of the original shell context. -## -## The script is not safe if it might contain a shell primitive. Therefore -## the analysis checks if the command in question is one of the underlying -## shell's primitives (in our case bash) and if so returns False -def safe_to_execute(asts: "list[AstNode]", variables: dict) -> bool: - ## There should always be a single AST per node and it must be a command - assert(len(asts) == 1) - ast = asts[0] - assert(isinstance(ast, CommandNode)) - logging.debug(f'Ast in question: {ast}') + +def is_node_safe(node: CommandNode, variables: dict) -> str: ## Expand and check whether the asts contain - ## a command substitution or a primitive. + ## a command substitution or a primitive. ## If so, then we need to tell the original script to execute the command. ## Expand the command argument - cmd_arg = ast.arguments[0] + cmd_arg = node.arguments[0] exp_state = expand.ExpansionState(variables) ## TODO: Catch exceptions around here expanded_cmd_arg = expand.expand_arg(cmd_arg, exp_state) cmd_str = string_of_arg(expanded_cmd_arg) logging.debug(f'Expanded command argument: {expanded_cmd_arg} (str: "{cmd_str}")') - - ## TODO: Determine if the ast contains a command substitution and if so - ## run it in the original script. - ## In the future, we should be able to perform stateful expansion too, - ## and properly execute and trace command substitutions. - + ## KK 2023-05-26 We need to keep in mind that whenever we execute something ## in the original shell, then we cannot speculate anything ## after it, because we cannot track read-write dependencies ## in the original shell. - if cmd_str in BASH_PRIMITIVES: return False - return True + +def is_pipe_node_safe_to_execute(node: PipeNode, variables: dict) -> bool: + for cmd in node.items: + logging.debug(f'Ast in question: {cmd}') + if not is_node_safe(cmd, variables): + return False + return True + +## Returns true if the script is safe to speculate and execute outside +## of the original shell context. +## +## The script is not safe if it might contain a shell primitive. Therefore +## the analysis checks if the command in question is one of the underlying +## shell's primitives (in our case bash) and if so returns False +def safe_to_execute(asts: "list[AstNode]", variables: dict) -> bool: + ## There should always be a single AST per node and it must be a command + assert(len(asts) == 1) + if isinstance(asts[0], PipeNode): + return is_pipe_node_safe_to_execute(asts[0], variables) + else: + assert(isinstance(asts[0], CommandNode)) + logging.debug(f'Ast in question: {asts[0]}') + return is_node_safe(asts[0], variables) + ## TODO: Determine if the ast contains a command substitution and if so + ## run it in the original script. + ## In the future, we should be able to perform stateful expansion too, + ## and properly execute and trace command substitutions. + + BASH_PRIMITIVES = ["break", "continue", "return"] diff --git a/parallel-orch/config.py b/parallel-orch/config.py index 2cb9b5f9..6d9b2abf 100644 --- a/parallel-orch/config.py +++ b/parallel-orch/config.py @@ -1,6 +1,7 @@ import os import subprocess import logging +import time ## TODO: Figure out how logging here plays out together with the log() in PaSh @@ -27,17 +28,21 @@ def log_root(msg, *args, **kwargs): ## Ensure that PASH_TMP_PREFIX is set by pa.sh -assert(not os.getenv('PASH_SPEC_TMP_PREFIX') is None) PASH_SPEC_TMP_PREFIX = os.getenv('PASH_SPEC_TMP_PREFIX') SOCKET_BUF_SIZE = 8192 SCHEDULER_SOCKET = os.getenv('PASH_SPEC_SCHEDULER_SOCKET') -MAX_KILL_ATTEMPTS = 10 # Define a maximum number of kill attempts for each process in the partial program order - INSIGNIFICANT_VARS = {'PWD', 'OLDPWD', 'SHLVL', 'PASH_SPEC_TMP_PREFIX', 'PASH_SPEC_SCHEDULER_SOCKET', 'PASH_SPEC_TOP', 'PASH_TOP', 'PASH_TOP_LEVEL','RANDOM', 'LOGNAME', 'MACHTYPE', 'MOTD_SHOWN', 'OPTERR', 'OPTIND', 'PPID', 'PROMPT_COMMAND', 'PS4', 'SHELL', 'SHELLOPTS', 'SHLVL', 'TERM', 'UID', 'USER', 'XDG_SESSION_ID'} -SIGNIFICANT_VARS = {'foo', 'bar', 'baz'} +SIGNIFICANT_VARS = {'foo', 'bar', 'baz', 'file1', 'file2', 'file3', 'file4', 'file5', 'LC_ALL', 'nchars', 'filename'} + +START_TIME = time.time() + +NAMED_TIMESTAMPS = {} + +SANDBOX_KILLING = False +SPECULATE_IMMEDIATELY = False \ No newline at end of file diff --git a/parallel-orch/executor.py b/parallel-orch/executor.py index 3b33025c..0349285e 100644 --- a/parallel-orch/executor.py +++ b/parallel-orch/executor.py @@ -12,18 +12,18 @@ def async_run_and_trace_command_return_trace(command, node_id, latest_env_file, trace_file = util.ptempfile() stdout_file = util.ptempfile() stderr_file = util.ptempfile() - post_exec_env = util.ptempfile() + post_execution_env_file = util.ptempfile() logging.debug(f'Scheduler: Stdout file for: {node_id} is: {stdout_file}') logging.debug(f'Scheduler: Stderr file for: {node_id} is: {stderr_file}') logging.debug(f'Scheduler: Trace file for: {node_id}: {trace_file}') - process = async_run_and_trace_command_return_trace_in_sandbox(command, trace_file, node_id, stdout_file, stderr_file, latest_env_file, post_exec_env, speculate_mode) - return process, trace_file, stdout_file, stderr_file, post_exec_env + process = async_run_and_trace_command_return_trace_in_sandbox(command, trace_file, node_id, stdout_file, stderr_file, latest_env_file, post_execution_env_file, speculate_mode) + return process, trace_file, stdout_file, stderr_file, post_execution_env_file def async_run_and_trace_command_return_trace_in_sandbox_speculate(command, node_id, latest_env_file): - process, trace_file, stdout_file, stderr_file, post_exec_env = async_run_and_trace_command_return_trace(command, node_id, latest_env_file, speculate_mode=True) - return process, trace_file, stdout_file, stderr_file, post_exec_env + process, trace_file, stdout_file, stderr_file, post_execution_env_file = async_run_and_trace_command_return_trace(command, node_id, latest_env_file, speculate_mode=True) + return process, trace_file, stdout_file, stderr_file, post_execution_env_file -def async_run_and_trace_command_return_trace_in_sandbox(command, trace_file, node_id, stdout_file, stderr_file, latest_env_file, post_exec_env, speculate_mode=False): +def async_run_and_trace_command_return_trace_in_sandbox(command, trace_file, node_id, stdout_file, stderr_file, latest_env_file, post_execution_env_file, speculate_mode=False): ## Call Riker to execute the command run_script = f'{config.PASH_SPEC_TOP}/parallel-orch/run_command.sh' args = ["/bin/bash", run_script, command, trace_file, stdout_file, latest_env_file] @@ -32,7 +32,7 @@ def async_run_and_trace_command_return_trace_in_sandbox(command, trace_file, nod else: args.append("standard") args.append(str(node_id)) - args.append(post_exec_env) + args.append(post_execution_env_file) # Save output to temporary files to not saturate the memory logging.debug(args) process = subprocess.Popen(args, stdout=None, stderr=None) diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index aef95f46..8aabcd7f 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -7,23 +7,25 @@ import config import executor import trace +from util import * import util +from collections import defaultdict -from shasta.ast_node import AstNode, CommandNode +from shasta.ast_node import AstNode, CommandNode, PipeNode class CompletedNodeInfo: - def __init__(self, exit_code, post_exec_env, stdout_file, sandbox_dir): + def __init__(self, exit_code, post_execution_env_file, stdout_file, sandbox_dir): self.exit_code = exit_code - self.post_exec_env = post_exec_env + self.post_execution_env_file = post_execution_env_file self.stdout_file = stdout_file self.sandbox_dir = sandbox_dir def get_exit_code(self): return self.exit_code - def get_post_exec_env(self): - return self.post_exec_env + def get_post_execution_env_file(self): + return self.post_execution_env_file def get_stdout_file(self): return self.stdout_file @@ -32,7 +34,7 @@ def get_sandbox_dir(self): return self.sandbox_dir def __str__(self): - return f'CompletedNodeInfo(ec:{self.get_exit_code()}, env:{self.get_post_exec_env()}, stdout:{self.get_stdout_file()}, sandbox:{self.get_sandbox_dir()})' + return f'CompletedNodeInfo(ec:{self.get_exit_code()}, env:{self.get_post_execution_env_file()}, stdout:{self.get_stdout_file()}, sandbox:{self.get_sandbox_dir()})' ## This class is used for both loop contexts and loop iters ## The indices go from inner to outer @@ -167,7 +169,8 @@ def __init__(self, id, cmd, asts, loop_context: LoopStack): ## There can only be a single AST per node, and this ## must be a command. assert(len(asts) == 1) - assert(isinstance(asts[0], CommandNode)) + # Check that the node contains only CommandNode(s) + analysis.validate_node(asts[0]) self.cmd_no_redir = trace.remove_command_redir(self.cmd) self.loop_context = loop_context ## Keep track of how many iterations of this loop node we have unrolled @@ -280,7 +283,13 @@ def __init__(self, nodes, edges, initial_env_file): self.latest_envs = {} self.initial_env_file = initial_env_file self.waiting_for_frontend = set() - + ## In case we spot a dependency meaning a node must execute after another node, it will appear here + ## Contains the nodes to execute only after the key node finishes execution + self.run_after = defaultdict(set) + self.pending_to_execute = set() + self.to_be_resolved_prev = {} + self.prechecked_env = set() + def __str__(self): return f"NODES: {len(self.nodes.keys())} | ADJACENCY: {self.adjacency}" @@ -355,6 +364,23 @@ def set_latest_env_file_for_node(self, node_id: NodeId, latest_env_file: str): def get_latest_env_file_for_node(self, node_id: NodeId) -> str: return self.latest_envs.get(node_id) + + def get_most_recent_possible_new_env_for_node(self, node_id) -> str: + most_recent_env_node = node_id + while self.get_new_env_file_for_node(most_recent_env_node) is None: + predecessor = self.get_prev(most_recent_env_node) + + ## This will trigger when we move to full Partial Orders + assert len(predecessor) <= 1 + + ## If there are no predecessors for a node it means we are at the source + ## so there is no point to search further back + if len(predecessor) == 0: + break + else: + most_recent_env_node = predecessor[0] + + return self.get_new_env_file_for_node(most_recent_env_node) ## This returns all previous nodes of a sub partial order def get_sub_po_prev_nodes(self, node_ids: "list[NodeId]") -> "list[NodeId]": @@ -392,17 +418,23 @@ def init_partial_order(self): self.init_workset() logging.debug(f'Initialized workset') self.populate_to_be_resolved_dict() - self.init_latest_env_files() + if config.SPECULATE_IMMEDIATELY: + self.init_latest_env_files() logging.debug(f'To be resolved sets per node:') logging.debug(self.to_be_resolved) logging.info(f'Initialized the partial order!') self.log_partial_program_order_info() - assert(self.valid()) - def init_latest_env_files(self): + + def init_latest_env_files(self, node=None): + if node is None: + env_to_assign = self.initial_env_file + else: + env_to_assign = self.get_new_env_file_for_node(node) for node_id in self.get_all_non_committed(): - self.set_latest_env_file_for_node(node_id, self.initial_env_file) + self.set_latest_env_file_for_node(node_id, env_to_assign) + def init_workset(self): self.workset = self.get_all_non_committed_standard_nodes() @@ -593,6 +625,16 @@ def add_to_write_set(self, node_id: NodeId, item: str): def add_to_speculated(self, node_id: NodeId): self.speculated = self.speculated.union([node_id]) + def is_first_node_when_env_is_uninitialized(self, speculate_immediately): + if not speculate_immediately: + starting_env_node = self.get_source_nodes() + ## We may have a loop node at the start + ## In that case, we roll back to the initial env + if len(starting_env_node) > 0 and self.get_latest_env_file_for_node(starting_env_node[0]) is None: + logging.debug("Initializing latest env and speculating") + return True + return False + # Check if the specific command can be resolved. # KK 2023-05-04 I am not even sure what this function does and why is it useful. def cmd_can_be_resolved(self, node_id: int) -> bool: @@ -627,96 +669,131 @@ def cmd_can_be_resolved(self, node_id: int) -> bool: logging.debug(f' >> Able to resolve {node_id}') return True - def __kill_all_currently_executing_and_schedule_restart(self): + def __kill_all_currently_executing_and_schedule_restart(self, start=None): nodes_to_kill = self.get_currently_executing() + if start is not None: + nodes_to_kill = [node_id for node_id in nodes_to_kill if node_id in self.get_transitive_closure([start])] for cmd_id in nodes_to_kill: self.__kill_node(cmd_id) + most_recent_new_env = self.get_most_recent_possible_new_env_for_node(cmd_id) + self.prechecked_env.discard(cmd_id) + if most_recent_new_env is not None: + + self.set_latest_env_file_for_node(cmd_id, most_recent_new_env) self.workset.remove(cmd_id) + log_time_delta_from_named_timestamp("PartialOrder", "RunNode", cmd_id) + log_time_delta_from_named_timestamp("PartialOrder", "PostExecResolution", cmd_id, key=f"PostExecResolution-{cmd_id}") # Our new workset is the nodes that were killed - # Previous workset got killed + # Previous workset got killed self.workset.extend(nodes_to_kill) + def __kill_node(self, cmd_id: "NodeId"): logging.debug(f'Killing and restarting node {cmd_id} because some workspaces have to be committed') - proc_to_kill, trace_file, _stdout, _stderr, _post_exec_env = self.commands_currently_executing.pop(cmd_id) + proc_to_kill, trace_file, _stdout, _stderr, _post_execution_env_file = self.commands_currently_executing.pop(cmd_id) # Add the trace file to the banned file list so we know to ignore the CommandExecComplete response self.banned_files.add(trace_file) - # Get all child processes of proc_to_kill - children = util.get_child_processes(proc_to_kill.pid) - - # Kill all child processes - for child in children: - util.kill_process(child) - - # Terminate the main process - util.kill_process(proc_to_kill.pid) + alive_after_kill = util.kill_process_tree(proc_to_kill.pid) + + if alive_after_kill: + logging.critical("Processes still alive after attempting to kill:") + for proc in alive_after_kill: + logging.critical(proc) + else: + logging.debug("All processes were successfully terminated.") def resolve_commands_that_can_be_resolved_and_push_frontier(self): - cmds_to_resolve = self.__pop_cmds_to_resolve_from_speculated() + # This may be obsolete since we only resolve one node at a time + # cmds_to_resolve = self.__pop_cmds_to_resolve_from_speculated() + # assert len(cmds_to_resolve) <= 1 + if len(self.speculated) == 0: + cmds_to_resolve = [] + else: + cmds_to_resolve = [self.speculated.pop()] logging.debug(f"Commands to check for dependencies this round are: {sorted(cmds_to_resolve)}") logging.debug(f"Commands that cannot be resolved this round are: {sorted(self.speculated)}") - ## Resolve dependencies for the commands that can actually be resolved to_commit = self.__resolve_dependencies_continuous_and_move_frontier(cmds_to_resolve) + for cmd in to_commit: + log_time_delta_from_named_timestamp("PartialOrder", "ResolveDependencies", cmd) + log_time_delta_from_named_timestamp("PartialOrder", "PostExecResolution", cmd, key=f"PostExecResolution-{cmd}") + log_time_delta_from_start_and_set_named_timestamp("PartialOrder", "ProcKilling") + if len(to_commit) == 0: logging.debug(" > No nodes to be committed this round") else: logging.debug(f" > Nodes to be committed this round: {to_commit}") logging.trace(f"Commit|"+",".join(str(node_id) for node_id in to_commit)) - self.__kill_all_currently_executing_and_schedule_restart() + if config.SANDBOX_KILLING: + logging.info("Sandbox killing") + self.__kill_all_currently_executing_and_schedule_restart(to_commit) + log_time_delta_from_named_timestamp("PartialOrder", "ProcKilling") self.commit_cmd_workspaces(to_commit) - # self.print_cmd_stderr(stderr) - - def __pop_cmds_to_resolve_from_speculated(self): - cmd_ids_to_check = sorted(list(self.speculated)) - logging.debug(f" > Uncommitted commands done executing to be checked: {cmd_ids_to_check}") - cmds_to_resolve = [] - for cmd_id in cmd_ids_to_check: - # We check if we can resolve any possible dependencies - # If we can't, we have to wait for another cycle - if not self.cmd_can_be_resolved(cmd_id): - if cmd_id not in self.speculated: - logging.debug(f" > Adding node {cmd_id} to waiting list") - logging.trace(f"WaitingAdd|{cmd_id}") - self.speculated.add(cmd_id) - else: - logging.debug(f" > Keeping node {cmd_id} to waiting list") - # If we are in this branch it means that we can resolve the dependencies of the current command - else: - cmds_to_resolve.append(cmd_id) - # We remove the command from the waiting to be resolved set - if cmd_id in self.speculated: - logging.debug(f" > Removing node {cmd_id} from waiting list") - logging.trace(f"WaitingRemove|{cmd_id}") - self.speculated.remove(cmd_id) - else: - logging.debug(f" > Node {cmd_id} is able to be resolved") - return sorted(cmds_to_resolve) + def check_dependencies(self, cmds_to_check, get_first_cmd_ids_fn, update_state_due_to_a_dependency_fn): + for second_cmd_id in cmds_to_check: + for first_cmd_id in get_first_cmd_ids_fn(second_cmd_id): + + if self.rw_sets.get(first_cmd_id) is not None and self.has_forward_dependency(first_cmd_id, second_cmd_id): + update_state_due_to_a_dependency_fn(first_cmd_id, second_cmd_id) + + # Internal function, modified the run_after dict and the pending_to_execute set + def __populate_run_after_dict(self): + for node in self.pending_to_execute.copy(): + prev_to_be_resolved = self.to_be_resolved_prev.get(node) + if prev_to_be_resolved is None: + return + # Check if env has changed since last comparison + elif set(self.to_be_resolved[node]) == set(prev_to_be_resolved): + # Not caring about this dependency because env has not yet changed + self.pending_to_execute.remove(node) + for k, v in self.run_after.items(): + if node in v: + self.run_after[k].remove(node) + + ## Spots dependencies and updates the state. + ## Safe to call everywhere + def resolve_dependencies_early(self, node_id=None): + def get_first_cmd_ids(second_cmd_id): + return sorted(self.to_be_resolved[second_cmd_id], reverse=True) + + def update_state_due_to_a_dependency(first_cmd_id, second_cmd_id): + self.waiting_for_frontend.discard(second_cmd_id) + self.run_after[first_cmd_id].add(second_cmd_id) + self.pending_to_execute.add(second_cmd_id) + logging.debug(f"Early resolution: Rerunning node {second_cmd_id} after {first_cmd_id} because of a dependency") + log_time_delta_from_named_timestamp("PartialOrder", "PostExecResolution", second_cmd_id) + + to_check = {node for node in self.waiting_for_frontend if node not in self.speculated} + if node_id is not None: + to_check.add(node_id) + self.check_dependencies(to_check, get_first_cmd_ids, update_state_due_to_a_dependency) + self.populate_to_be_resolved_dict() + self.__populate_run_after_dict() def resolve_dependencies(self, cmds_to_resolve): - # Init stuff + def get_first_cmd_ids(second_cmd_id): + return sorted([cmd_id for cmd_id in self.to_be_resolved[second_cmd_id] if cmd_id not in self.stopped]) + + def update_state_due_to_a_dependency(first_cmd_id, second_cmd_id): + logging.debug(f' > Command {second_cmd_id} was added to the workset, due to a forward dependency with {first_cmd_id}') + new_workset.add(second_cmd_id) + new_workset = set() - for second_cmd_id in sorted(cmds_to_resolve): - first_cmd_ids = sorted([cmd_id for cmd_id in self.to_be_resolved[second_cmd_id] if cmd_id not in self.stopped]) - for first_cmd_id in first_cmd_ids: - if second_cmd_id not in new_workset: - ## We only check for forward dependencies if the first node is not a loop (abstract) node - if self.is_loop_node(first_cmd_id): - logging.debug(f' > Skipping dependency check with node {first_cmd_id} because it is a loop node') - continue - if self.has_forward_dependency(first_cmd_id, second_cmd_id): - logging.debug(f' > Command {second_cmd_id} was added to the workset, due to a forward dependency with {first_cmd_id}') - new_workset.add(second_cmd_id) + self.check_dependencies(sorted(cmds_to_resolve), get_first_cmd_ids, update_state_due_to_a_dependency) + return new_workset + ## Resolve all the forward dependencies and update the workset ## Forward dependency is when a command's output is the same ## as the input of a following command def __resolve_dependencies_continuous_and_move_frontier(self, cmds_to_resolve): self.log_partial_program_order_info() - + for cmd in cmds_to_resolve: + log_time_delta_from_start_and_set_named_timestamp("PartialOrder", "ResolveDependencies", cmd) + logging.debug(f"Commands to be checked for dependencies: {sorted(cmds_to_resolve)}") logging.debug(" --- Starting dependency resolution --- ") new_workset = self.resolve_dependencies(cmds_to_resolve) @@ -885,15 +962,19 @@ def wait_received(self, node_id: NodeId): ## node is very complex and not elegant. ## TODO: Could we swap unrolling and progressing so that we always ## check if a node can be progressed by checking edges? + log_time_delta_from_start_and_set_named_timestamp("PartialOrder", "ProgressingPoDueToWait", node_id) self.progress_po_due_to_wait(node_id) + log_time_delta_from_named_timestamp("PartialOrder", "ProgressingPoDueToWait", node_id) + + log_time_delta_from_start_and_set_named_timestamp("PartialOrder", "ProgressingPoDueToWait", node_id) ## Unroll some nodes if needed. if node_id.has_iters(): ## TODO: This unrolling can also happen and be moved to speculation. ## For now we are being conservative and that is why it only happens here ## TODO: Move this to the scheduler.schedule_work() (if we have a loop node waiting for response and we are not unrolled, unroll to create work) self.maybe_unroll(node_id) - + assert(self.valid()) def find_outer_loop_sub_partial_order(self, loop_id: int, nodes_subset: "list[NodeId]") -> "list[NodeId]": @@ -1070,8 +1151,9 @@ def unroll_loop_node(self, target_concrete_node_id: NodeId): def maybe_unroll(self, node_id: NodeId) -> NodeId: ## Only unrolls this node if it doesn't already exist in the PO if not self.is_node_id(node_id): + log_time_delta_from_start_and_set_named_timestamp("PartialOrder", "Unrolling", node_id) self.unroll_loop_node(node_id) - + log_time_delta_from_start_and_set_named_timestamp("PartialOrder", "Unrolling", node_id) ## The node_id must be part of the PO after unrolling, otherwise we did something wrong assert(self.is_node_id(node_id)) @@ -1094,8 +1176,8 @@ def __frontier_commit_and_push(self): and frontier_node not in self.get_committed() \ and frontier_node not in self.stopped \ and frontier_node not in self.speculated \ - and frontier_node not in self.workset\ - and not self.is_loop_node(frontier_node)\ + and frontier_node not in self.workset \ + and not self.is_loop_node(frontier_node) \ and frontier_node not in self.waiting_for_frontend: ## Commit the node self.commit_node(frontier_node) @@ -1111,7 +1193,21 @@ def __frontier_commit_and_push(self): # If node is still being executed, we cannot progress further else: new_frontier.extend([frontier_node]) - logging.debug(f" > Not commiting node {frontier_node}, readding to frontier") + if frontier_node in self.get_currently_executing(): + logging.debug(f" > Node {frontier_node} is still being executed") + elif frontier_node in self.get_committed(): + logging.debug(f" > Node {frontier_node} is already committed") + elif frontier_node in self.stopped: + logging.debug(f" > Node {frontier_node} is stopped") + elif frontier_node in self.speculated: + logging.debug(f" > Node {frontier_node} is speculated") + elif frontier_node in self.workset: + logging.debug(f" > Node {frontier_node} is in the workset") + elif self.is_loop_node(frontier_node): + logging.debug(f" > Node {frontier_node} is a loop node") + elif frontier_node in self.waiting_for_frontend: + logging.debug(f" > Node {frontier_node} is waiting for frontend") + logging.debug(f" > Not commiting node {frontier_node}, keeping in frontier") ## Update the frontier to the new frontier self.frontier = new_frontier @@ -1181,8 +1277,10 @@ def attempt_move_stopped_to_workset(self): ## TODO: Eventually, in the future, let's add here some form of limit def schedule_work(self, limit=0): + if self.is_first_node_when_env_is_uninitialized(config.SPECULATE_IMMEDIATELY): + logging.debug("Not scheduling work yet, waiting for first Wait") + return # self.log_partial_program_order_info() - logging.debug("Scheduling work...") logging.debug("Rerunning stopped commands") # attempt_move_stopped_to_workset() needs to happen before the node execution self.attempt_move_stopped_to_workset() @@ -1202,6 +1300,7 @@ def schedule_work(self, limit=0): # Nodes to be scheduled are always not committed and not executing def schedule_node(self, cmd_id): # This replaced the old frontier check + log_time_delta_from_start_and_set_named_timestamp("PartialOrder", "RunNode", cmd_id) if self.is_next_non_committed_node(cmd_id): # TODO: run this and before committing kill any speculated commands still executing self.run_cmd_non_blocking(cmd_id) @@ -1215,6 +1314,7 @@ def run_cmd_non_blocking(self, node_id: NodeId): ## A command should only be run if it's in the frontier, otherwise it should be spec run logging.debug(f'Running command: {node_id} {self.get_node(node_id)}') logging.debug(f"ExecutingAdd|{node_id}") + self.to_be_resolved_prev[node_id] = self.to_be_resolved[node_id].copy() self.execute_cmd_core(node_id, speculate=False) ## Run a command and add it to the dictionary of executing ones @@ -1252,16 +1352,40 @@ def execute_cmd_core(self, node_id: NodeId, speculate=False): execute_func = executor.async_run_and_trace_command_return_trace_in_sandbox_speculate else: execute_func = executor.async_run_and_trace_command_return_trace - proc, trace_file, stdout, stderr, post_exec_env = execute_func(cmd, node_id, env_file_to_execute_with) - self.commands_currently_executing[node_id] = (proc, trace_file, stdout, stderr, post_exec_env) - logging.debug(f" >>>>> Command {node_id} - {proc.pid} just started executing") + proc, trace_file, stdout, stderr, post_execution_env_file = execute_func(cmd, node_id, env_file_to_execute_with) + self.commands_currently_executing[node_id] = (proc, trace_file, stdout, stderr, post_execution_env_file) + logging.debug(f" >>>>> Command {node_id} - {proc.pid} just started executing - {post_execution_env_file}") + + # This method attempts to add to workset (rerun) + # any command that found to have a dependency through early resolution + def attempt_rerun_pending_nodes(self): + restarted_nodes = set() + for node_id, run_after_nodes in self.run_after.items(): + new_run_after_nodes = run_after_nodes.copy() + if self.get_new_env_file_for_node(node_id) is not None and node_id not in self.pending_to_execute and node_id not in self.get_currently_executing(): + for node in run_after_nodes: + if node not in self.get_currently_executing(): + logging.debug(f"Running node {node} after execution of {node_id}") + self.workset.append(node) + self.pending_to_execute.discard(node) + self.set_latest_env_file_for_node(node, self.get_new_env_file_for_node(node_id)) + restarted_nodes.add(node) + self.prechecked_env.discard(node) + new_run_after_nodes.discard(node) + self.run_after[node_id] = new_run_after_nodes + return restarted_nodes + def command_execution_completed(self, node_id: NodeId, riker_exit_code:int, sandbox_dir: str): + log_time_delta_from_named_timestamp("PartialOrder", "RunNode", node_id) + log_time_delta_from_start_and_set_named_timestamp("PartialOrder", "PostExecResolution", node_id, key=f"PostExecResolution-{node_id}") + logging.debug(f" --- Node {node_id}, just finished execution ---") self.sandbox_dirs[node_id] = sandbox_dir ## TODO: Store variable file somewhere so that we can return when wait - _proc, trace_file, stdout, stderr, post_exec_env = self.commands_currently_executing.pop(node_id) - logging.debug(f" >>>>> Command {node_id} - {_proc.pid} just finished executing") + + _proc, trace_file, stdout, stderr, post_execution_env_file = self.commands_currently_executing.pop(node_id) + logging.trace(f"ExecutingRemove|{node_id}") # Handle stopped by riker due to network access if int(riker_exit_code) == 159: @@ -1275,13 +1399,12 @@ def command_execution_completed(self, node_id: NodeId, riker_exit_code:int, sand ## Save the completed node info. Note that if the node doesn't commit ## this information will be invalid and rewritten the next time execution ## is completed for this node. - completed_node_info = CompletedNodeInfo(cmd_exit_code, post_exec_env, stdout, sandbox_dir) + completed_node_info = CompletedNodeInfo(cmd_exit_code, post_execution_env_file, stdout, sandbox_dir) self.nodes[node_id].set_completed_info(completed_node_info) ## We no longer add failed commands to the stopped set, ## because this leads to more repetitions than needed ## and does not allow us to properly speculate commands - read_set, write_set = trace.parse_and_gather_cmd_rw_sets(trace_object) rw_set = RWSet(read_set, write_set) self.update_rw_set(node_id, rw_set) @@ -1295,17 +1418,26 @@ def command_execution_completed(self, node_id: NodeId, riker_exit_code:int, sand # do nothing and wait until a new command finishes executing logging.debug("No resolvable nodes were found in this round, nothing will change...") return - + + + log_time_delta_from_named_timestamp("PartialOrder", "PostExecResolutionECCheck", node_id, key=f"PostExecResolution-{node_id}", invalidate=False) # Remove from workset and add it again later if necessary self.workset.remove(node_id) + log_time_delta_from_start_and_set_named_timestamp("PartialOrder", "PostExecResolutionFrontendWait", node_id) + ## Here we check if the most recent env has been received. If not, we cannot resolve anything just yet. if self.get_new_env_file_for_node(node_id) is None: logging.debug(f"Node {node_id} has not received its latest env from runtime yet. Waiting...") self.waiting_for_frontend.add(node_id) + + # We will however attempt to resolve dependencies early + self.resolve_dependencies_early(node_id) + restarted_cmds = self.attempt_rerun_pending_nodes() + self.log_partial_program_order_info() ## Here we continue with the normal execution flow else: logging.debug(f"Node {node_id} has already received its latest env from runtime. Examining differences...") - self.resolve_most_recent_envs_and_continue_command_execution(node_id) + self.resolve_most_recent_envs_and_continue_command_execution_check_only_wait_node(node_id) #TODO: Remove ths in the future - we need a more robust approach to check for env diffs. def exclude_insignificant_diffs(self, env_diff_dict): @@ -1331,60 +1463,62 @@ def significant_diff_in_env_dicts(self, only_in_new, only_in_latest, different_i logging.debug("No significant differences found:") return False - def maybe_resolve_most_recent_envs_and_continue_resolution(self, node_id: NodeId): - if node_id in self.waiting_for_frontend: - logging.debug(f"Node {node_id} received its latest env from runtime, continuing resolution.") - self.resolve_most_recent_envs_and_continue_command_execution(node_id) - - def resolve_most_recent_envs_and_continue_command_execution(self, new_env_node: NodeId): - to_check = list(self.waiting_for_frontend) + [new_env_node] - logging.debug(f"Node {new_env_node} received its latest env from runtime. Comparing env with itself and other waiting nodes.") - # Node is no longer waiting to be resolved. It might have not been waiting at all. - self.waiting_for_frontend.discard(new_env_node) - for node_id in to_check: - if self.new_and_latest_env_files_have_significant_differences(self.get_new_env_file_for_node(new_env_node), - self.get_latest_env_file_for_node(node_id)): - logging.debug(f"Significant differences found between new and latest env files for {node_id}.") - logging.debug(f"Assigning node {new_env_node} new env (Wait) as the new latest env of node {node_id} and re-executing.") - # If there are significant differences, set the new env as the latest (the one to run Riker with) - self.set_latest_env_file_for_node(node_id, self.get_new_env_file_for_node(new_env_node)) - # Add the node to the workset again - assert node_id not in self.workset - self.workset.append(node_id) - self.waiting_for_frontend.discard(node_id) - elif node_id == new_env_node: - logging.debug(f"Finding sets of commands that can be resolved after {node_id} finished executing and got its latest env.") - assert(node_id not in self.stopped) - self.add_to_speculated(node_id) - ## We can now call the general resolution method that determines which commands - ## can be resolved (all their dependencies are done executing), and resolves them. - self.resolve_commands_that_can_be_resolved_and_push_frontier() - assert(self.valid()) - else: - logging.debug(f"Node {node_id} has no significant differences with the new env, but has not yet received its wait. Nothing to do for now.") - - def resolve_most_recent_envs_and_continue_command_execution_check_only_wait_node(self, node_id: NodeId): + def update_env_and_restart_nodes(self, node_id: NodeId): + logging.debug(f"Significant differences found between new and latest env files for {node_id}.") + logging.debug(f"Assigning node {node_id} new env (Wait) as the new latest env and re-executing.") + self.set_latest_env_file_for_node(node_id, self.get_new_env_file_for_node(node_id)) + self.prechecked_env.discard(node_id) + if node_id not in self.workset: + self.workset.append(node_id) + self.__kill_all_currently_executing_and_schedule_restart(start=node_id) + new_waiting_for_frontend = self.waiting_for_frontend.copy() + for waiting_for_frontend_node in self.waiting_for_frontend: + if waiting_for_frontend_node not in self.workset and waiting_for_frontend_node in self.get_transitive_closure([node_id]): + self.workset.append(waiting_for_frontend_node) + new_waiting_for_frontend.remove(waiting_for_frontend_node) + most_recent_new_env = self.get_most_recent_possible_new_env_for_node(waiting_for_frontend_node) + self.set_latest_env_file_for_node(waiting_for_frontend_node, most_recent_new_env) + self.prechecked_env.discard(waiting_for_frontend_node) + assert(self.get_new_env_file_for_node(node_id) is not None) + assert(self.get_latest_env_file_for_node(waiting_for_frontend_node) is not None) + self.log_partial_program_order_info() + logging.debug("-") + self.waiting_for_frontend = new_waiting_for_frontend + self.populate_to_be_resolved_dict() + + def resolve_most_recent_envs_check_only_wait_node_early(self, node_id: NodeId, restarted_cmds=None): + if node_id not in self.prechecked_env and self.new_and_latest_env_files_have_significant_differences(self.get_new_env_file_for_node(node_id), + self.get_latest_env_file_for_node(node_id)): + self.update_env_and_restart_nodes(node_id) + else: + self.prechecked_env.add(node_id) + + def resolve_most_recent_envs_and_continue_command_execution_check_only_wait_node(self, node_id: NodeId, restarted_cmds=None): logging.debug(f"Node {node_id} received its latest env from runtime, continuing resolution.") - # Node is no longer waiting to be resolved. It might have not been waiting at all. self.waiting_for_frontend.discard(node_id) - if self.new_and_latest_env_files_have_significant_differences(self.get_new_env_file_for_node(node_id), + if node_id not in self.prechecked_env and self.new_and_latest_env_files_have_significant_differences(self.get_new_env_file_for_node(node_id), self.get_latest_env_file_for_node(node_id)): - logging.debug(f"Significant differences found between new and latest env files for {node_id}.") - logging.debug(f"Assigning node {node_id} new env (Wait) as the new latest env and re-executing.") - # If there are significant differences, set the new env as the latest (the one to run Riker with) - self.set_latest_env_file_for_node(node_id, self.get_new_env_file_for_node(node_id)) - # Add the node to the workset again - if node_id not in self.workset: - self.workset.append(node_id) - else: + self.update_env_and_restart_nodes(node_id) + else: logging.debug(f"Finding sets of commands that can be resolved after {node_id} finished executing and got its latest env") assert(node_id not in self.stopped) + log_time_delta_from_start_and_set_named_timestamp("PartialOrder", "WaitingToResolve", node_id) self.add_to_speculated(node_id) - ## We can now call the general resolution method that determines which commands - ## can be resolved (all their dependencies are done executing), and resolves them. + self.resolve_dependencies_early(node_id) + restarted_cmds = self.attempt_rerun_pending_nodes() + logging.debug(f"Restarted after successful env resolution {restarted_cmds}") + self.log_partial_program_order_info() self.resolve_commands_that_can_be_resolved_and_push_frontier() assert(self.valid()) - + + def maybe_resolve_most_recent_envs_and_continue_resolution(self, node_id: NodeId): + if node_id in self.waiting_for_frontend: + logging.debug(f"Node {node_id} received its new env from runtime, continuing full env resolution.") + self.resolve_most_recent_envs_and_continue_command_execution_check_only_wait_node(node_id) + else: + logging.debug(f"Node {node_id} received its new env from runtime, continuing early env resolution.") + self.resolve_most_recent_envs_check_only_wait_node_early(node_id) + def new_and_latest_env_files_have_significant_differences(self, new_env_file, latest_env_file): # Early resolution if same files are compared if new_env_file == latest_env_file: @@ -1408,6 +1542,7 @@ def print_cmd_stderr(self, stderr): def commit_cmd_workspaces(self, to_commit_ids): for cmd_id in sorted(to_commit_ids): + log_time_delta_from_start_and_set_named_timestamp("PartialOrder", "CommitNode", cmd_id) workspace = self.sandbox_dirs[cmd_id] if workspace != "": logging.debug(f" (!) Committing workspace of cmd {cmd_id} found in {workspace}") @@ -1415,6 +1550,7 @@ def commit_cmd_workspaces(self, to_commit_ids): logging.debug(commit_workspace_out.decode()) else: logging.debug(f" (!) No need to commit workspace of cmd {cmd_id} as it was run in the main workspace") + log_time_delta_from_start_and_set_named_timestamp("PartialOrder", "CommitNode", cmd_id) def log_rw_sets(self): logging.debug("====== RW Sets " + "=" * 65) @@ -1432,6 +1568,10 @@ def log_partial_program_order_info(self): logging.debug(f"WAITING: {sorted(list(self.speculated))}") logging.debug(f"for FRONTEND: {sorted(list(self.waiting_for_frontend))}") logging.debug(f"TO RESOLVE: {self.to_be_resolved}") + logging.debug(f"PENDING TO EXEC: {self.pending_to_execute}") + logging.debug(f"RUN AFTER: {self.run_after}") + logging.debug(f"New envs: {self.new_envs}") + logging.debug(f"Latest envs: {self.latest_envs}") self.log_rw_sets() logging.debug(f"=" * 80) diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index 1cb51271..29ebddbf 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -1,10 +1,8 @@ import argparse -import copy import logging import signal from util import * import config -import sys from partial_program_order import parse_partial_program_order_from_file, LoopStack, NodeId, parse_node_id ## @@ -28,6 +26,15 @@ def parse_args(): type=str, default=None, help="Set logging output file. Default: stdout") + parser.add_argument("--sandbox-killing", + action="store_true", + default=False, + help="Kill any running overlay instances before commiting to the lower layer") + parser.add_argument("--speculate-immediately", + action="store_true", + default=False, + help="Speculate immediately instead of waiting for the first Wait message.") + args, unknown_args = parser.parse_known_args() return args @@ -96,18 +103,25 @@ def handle_wait(self, input_cmd: str, connection): ## Set the new env file for the node self.partial_program_order.set_new_env_file_for_node(node_id, pash_runtime_vars_file_str) - - ## Attempt to resolve environment differences on waiting partial order nodes - self.partial_program_order.maybe_resolve_most_recent_envs_and_continue_resolution(node_id) + if self.partial_program_order.is_first_node_when_env_is_uninitialized(config.SPECULATE_IMMEDIATELY): + logging.debug("Initializing latest env and speculating") + self.partial_program_order.init_latest_env_files(node_id) + ## Attempt to rerun all pending nodes + self.partial_program_order.attempt_rerun_pending_nodes() + ## Inform the partial order that we received a wait for a node so that it can push loops ## forward and so on. + self.partial_program_order.maybe_unroll(node_id) + + # Moved this below wait_received, in order to support unrolled loop nodes + self.partial_program_order.maybe_resolve_most_recent_envs_and_continue_resolution(node_id) + self.partial_program_order.wait_received(node_id) ## If the node_id is already committed, just return its exit code if node_id in self.partial_program_order.get_committed(): - # TODO: Env check and if no conflicts, commit logging.debug(f'Node: {node_id} found in committed, responding immediately!') self.waiting_for_response[node_id] = connection self.respond_to_pending_wait(node_id) @@ -152,7 +166,7 @@ def respond_to_pending_wait(self, node_id: int): ## Get the completed node info node = self.partial_program_order.get_node(node_id) completed_node_info = node.get_completed_node_info() - msg = f'{completed_node_info.get_exit_code()} {completed_node_info.get_post_exec_env()} {completed_node_info.get_stdout_file()}' + msg = f'{completed_node_info.get_exit_code()} {completed_node_info.get_post_execution_env_file()} {completed_node_info.get_stdout_file()}' response = success_response(msg) ## Send the response self.respond_to_frontend_core(node_id, response) @@ -172,8 +186,6 @@ def handle_command_exec_complete(self, input_cmd: str): if trace_file in self.partial_program_order.banned_files: logging.debug(f'CommandExecComplete: {cmd_id} ignored') return - logging.debug(input_cmd) - ## Gather RWset, resolve dependencies, and progress graph self.partial_program_order.command_execution_completed(cmd_id, exit_code, sandbox_dir) @@ -185,23 +197,31 @@ def process_next_cmd(self): connection, input_cmd = socket_get_next_cmd(self.socket) if(input_cmd.startswith("Init")): + log_time_delta_from_start_and_set_named_timestamp("Scheduler", "PartialOrderInit") connection.close() self.handle_init(input_cmd) - ## TODO: Read the partial order from the given file + ## TODO: Read the partial order from the given file + log_time_delta_from_named_timestamp("Scheduler", "PartialOrderInit") elif (input_cmd.startswith("Daemon Start") or input_cmd == ""): + log_time_delta_from_start_and_set_named_timestamp("Scheduler", "DaemonStart") connection.close() ## This happens when pa.sh first connects to daemon to see if it is on logging.debug(f'PaSh made first contact with scheduler server.') + log_time_delta_from_named_timestamp("Scheduler", "DaemonStart") elif (input_cmd.startswith("CommandExecComplete:")): + log_time_delta_from_start_and_set_named_timestamp("Scheduler", "CommandExecComplete") ## We have received this message from an a runner (tracer +isolation) ## The runner should have already parsed RWsets and serialized them to ## a file. connection.close() self.handle_command_exec_complete(input_cmd) + log_time_delta_from_named_timestamp("Scheduler", "CommandExecComplete") elif (input_cmd.startswith("Wait")): + log_time_delta_from_start_and_set_named_timestamp("Scheduler", "Wait") self.handle_wait(input_cmd, connection) + log_time_delta_from_named_timestamp("Scheduler", "Wait") elif (input_cmd.startswith("Done")): - + log_time_delta_from_start_and_set_named_timestamp("Scheduler", "Done") logging.debug(f'Scheduler server received shutdown message.') logging.debug(f'The partial order was successfully completed.') if not self.partial_program_order.is_completed(): @@ -209,6 +229,7 @@ def process_next_cmd(self): socket_respond(connection, success_response("All finished!")) self.partial_program_order.log_executions() self.done = True + log_time_delta_from_named_timestamp("Scheduler", "Done") else: logging.error(error_response(f'Error: Unsupported command: {input_cmd}')) raise Exception(f'Error: Unsupported command: {input_cmd}') @@ -230,10 +251,12 @@ def check_unsafe_and_waiting(self): ## It should add some work (if possible), and then return immediately. ## It is called once per loop iteration, making sure that there is always work happening def schedule_work(self): + log_time_delta_from_start_and_set_named_timestamp("Scheduler", "ScheduleWork") self.partial_program_order.schedule_work() ## Respond to any waiting nodes that have been deemed to be unsafe self.check_unsafe_and_waiting() + log_time_delta_from_named_timestamp("Scheduler", "ScheduleWork") def run(self): ## The first command should be the daemon start @@ -266,6 +289,7 @@ def terminate_pending_commands(self): def main(): + log_time_delta_from_start("Scheduler", "Scheduler Init") args = init() # Format logging @@ -284,7 +308,10 @@ def main(): logging.getLogger().setLevel(logging.DEBUG) # elif args.debug_level >= 3: # logging.getLogger().setLevel(logging.TRACE) - + + # Set optimization options + config.SANDBOX_KILLING = args.sandbox_killing + config.SPECULATE_IMMEDIATELY = args.speculate_immediately scheduler = Scheduler(config.SCHEDULER_SOCKET) scheduler.run() diff --git a/parallel-orch/trace.py b/parallel-orch/trace.py index d106cce3..e1ec859f 100644 --- a/parallel-orch/trace.py +++ b/parallel-orch/trace.py @@ -4,6 +4,7 @@ from typing import Tuple from enum import Enum import logging +from copy import deepcopy class Ref(Enum): @@ -44,8 +45,15 @@ def resolve_permissions(self, permissions: str): def __str__(self): return f"PathRef({self.ref}, {self.path}, {'r' if self.is_read else '-'}{'w' if self.is_write else '-'}{'x' if self.is_exec else '-'} {'no follow' if self.is_nofollow else ''})" + + def __repr__(self) -> str: + return self.__str__() def get_resolved_path(self): + + if isinstance(self.ref, PathRef): + self.ref = self.ref.get_resolved_path() + # Remove dupliate prefixes if not self.path.startswith("/"): modified_path = "/" + self.path @@ -60,6 +68,7 @@ def get_resolved_path(self): return os.path.join(commonprefix, ref_without_prefix, path_without_prefix).replace("/./", "/") + class PathRefKey: @@ -78,6 +87,9 @@ def __hash__(self): def __str__(self): return f"Key({self.lhs_ref}@{self.env})" + + def __repr__(self) -> str: + return self.__str__() class ExpectResult(): @@ -90,6 +102,15 @@ def __str__(self, ref, result): return f"ExpectResult({self.ref}, {self.result})" +class PipeRef: + + def __init__(self, lhs_ref, env): + self.ref = PathRefKey(env, lhs_ref) + + def __str__(self): + return f"PipeRef({self.lhs_ref})" + + def log_resolved_trace_items(resolved_dict): for k, v in resolved_dict.items(): try: @@ -117,6 +138,9 @@ def is_no_command_prefix(line): def is_new_path_ref(trace_item): return "PathRef" in trace_item +def is_pipe_ref(trace_item): + return "PipeRef" in trace_item + def get_path_ref_id(trace_item): return trace_item.split("=")[0].strip() @@ -174,9 +198,9 @@ def is_launch(line): def parse_launch_command(trace_item): - assignment_prefix = trace_item.split(", ")[0].split( + assignment_prefix = trace_item.split("], ")[0].split( "([Command ")[1].rstrip("]").strip() - assignment_suffix = ", ".join(trace_item.split(", ")[1:]).strip() + assignment_suffix = ", ".join(trace_item.split("], ")[1:]).strip() assignment_string = assignment_suffix[1:-2].split(",") assignments = [(x.split("=")) for x in assignment_string] return assignment_prefix, assignments @@ -210,6 +234,8 @@ def is_expect_result(trace_item): def parse_expect_result(trace_item): return trace_item.lstrip("ExpectResult(").split(")")[0].split(", ") +def parse_pipe_ref(trace_item): + return trace_item.split("] = ")[0].lstrip("[").split(", ") def parse_launch(refs_dict, keys_order, env, line) -> None: assignment_prefix, assignments = parse_launch_command( @@ -220,7 +246,6 @@ def parse_launch(refs_dict, keys_order, env, line) -> None: refs_dict[lhs_ref] = refs_dict[rhs_ref] keys_order.append(lhs_ref) - def add_ref_to_refs_dict(refs_dict, keys_order, lhs_ref, ref): refs_dict[lhs_ref] = ref keys_order.append(lhs_ref) @@ -256,13 +281,25 @@ def parse_new_path_ref(refs_dict, keys_order, env, line): refs_dict[lhs_ref] = path_ref keys_order.append(lhs_ref) - def parse_expect_result_item(expect_result_dict, env, line): line = remove_command_prefix(line).strip() path_ref_id, result = parse_expect_result(line) lhs_ref = PathRefKey(env, path_ref_id) expect_result_dict[lhs_ref] = ExpectResult(lhs_ref, result) - + +def parse_pipe_ref_item(refs_dict, keys_order, env, line): + line = remove_command_prefix(line).strip() + # lhs_ref, rhs_ref = parse_pipe_ref(line) + # Warning HACK: This is a hack to get the correct lhs_ref + # we are probably ok with this because it. + rhs_ref, lhs_ref = parse_pipe_ref(line) + lhs_key = PathRefKey(env, lhs_ref) + lhs_key_rev = PathRefKey(env, rhs_ref) + pipe_ref = PipeRef(rhs_ref, env) + pipe_ref_rev = PipeRef(lhs_ref, env) + refs_dict[lhs_key] = pipe_ref.ref + refs_dict[lhs_key_rev] = pipe_ref_rev.ref + keys_order.append(lhs_key) def parse_rw_sets(trace_object) -> None: # logging.trace("".join(trace_object)) @@ -284,6 +321,9 @@ def parse_rw_sets(trace_object) -> None: # Parses PathRef(...) elif is_new_path_ref(line): parse_new_path_ref(refs_dict, keys_order, env, line) + # Parses PipeRef + elif is_pipe_ref(line): + parse_pipe_ref_item(refs_dict, keys_order, env, line) # Parses ExpectResult(...) elif is_expect_result(line): parse_expect_result_item(expect_result_dict, env, line) @@ -308,19 +348,25 @@ def replace_path_ref_terminal_nodes(refs_dict: dict): refs_dict_new = {} for i, ref in refs_dict.items(): if isinstance(ref, PathRef): + # HACK: This is hard-coded stdout if ref.path == "" and ref.is_nofollow: continue else: - if ref.ref not in refs_dict: + # If ref of ref is string, it means that we reached a terminal node. + if isinstance(ref.ref, str): + pass + elif ref.ref not in refs_dict: key = PathRefKey("No Command", "r1") ref.ref = refs_dict[key].value else: + if isinstance(refs_dict[ref.ref], Ref): - ref.ref = refs_dict[ref.ref].value + ref.ref = deepcopy(str(refs_dict[ref.ref].value)) else: - ref.ref = os.getcwd() - refs_dict_new[i] = ref + ref.ref = deepcopy(refs_dict[ref.ref]) + assert(i not in refs_dict_new) + refs_dict_new[i] = deepcopy(ref) return refs_dict_new @@ -406,3 +452,22 @@ def parse_exit_code(trace_object) -> int: for line in reversed(trace_object): if "Exit(" in line: return int(line.split("Exit(")[1].rstrip(")\n")) + +# Trace can be called as a script with the trace file to analyze as an argument +def main(): + logging.basicConfig(level=logging.DEBUG) + trace_file = sys.argv[1] + with open(trace_file, "r") as f: + trace_object = f.readlines() + read_set, write_set = parse_and_gather_cmd_rw_sets(trace_object) + print("Read set:") + for r in read_set: + print(r) + print("Write set:") + for w in write_set: + print(w) + print("Exit code:") + print(parse_exit_code(trace_object)) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/parallel-orch/util.py b/parallel-orch/util.py index 17aa7a59..1ed96118 100644 --- a/parallel-orch/util.py +++ b/parallel-orch/util.py @@ -5,8 +5,9 @@ import subprocess import tempfile import time -import difflib import re +import psutil +import signal def ptempfile(): fd, name = tempfile.mkstemp(dir=config.PASH_SPEC_TMP_PREFIX) @@ -58,49 +59,6 @@ def socket_respond(connection: socket.socket, message: str): connection.sendall(bytes_message) connection.close() -# Check if the process with the given PID is alive. -def is_process_alive(pid) -> bool: - try: - os.kill(pid, 0) - except OSError: - return False - else: - return True - -# Get all child process PIDs of a process -def get_child_processes(parent_pid) -> int: - try: - output = subprocess.check_output(['pgrep', '-P', str(parent_pid)]) - return [int(pid) for pid in output.decode('utf-8').split()] - except subprocess.CalledProcessError: - # No child processes were found - return [] - -# Note: Check this function as it does not seem the right way to kill a proc. -# SIGKILL should be sent once and for all. -# Kills the process with the provided PID. -# Returns True if the process was successfully killed, False otherwise. -def kill_process(pid: int) -> bool: - kill_attempts = 0 - while is_process_alive(pid) and kill_attempts < config.MAX_KILL_ATTEMPTS: - try: - # Send SIGKILL signal for a forceful kill - subprocess.check_call(['kill', '-9', str(pid)]) - time.sleep(0.005) # Sleep for 5 milliseconds before checking again - except subprocess.CalledProcessError: - logging.debug(f"Failed to kill PID {pid}.") - kill_attempts += 1 - - if kill_attempts >= config.MAX_KILL_ATTEMPTS: - logging.warning(f"Gave up killing PID {pid} after {config.MAX_KILL_ATTEMPTS} attempts.") - return False - - return True - - -# HACK: Parsing an env file like this is likely problematic. -# TODO: We can leave this as-is for now, but we should consider -# using a more robust approach for comparing the env files def parse_env_string_to_dict(content): # Parse scalar string vars scalar_vars_string = re.findall(r'declare (?:-x|--)? (\w+)="([^"]*)"', content, re.DOTALL) @@ -138,3 +96,76 @@ def compare_env_strings(file1_content, file2_content): dict1 = parse_env_string_to_dict(file1_content) dict2 = parse_env_string_to_dict(file2_content) return compare_dicts(dict1, dict2) + +def log_time_delta_from_start(module: str, action: str, node=None): + logging.info(f">|{module}|{action}{',' + str(node) if node is not None else ''}|Time From start:{to_milliseconds_str(time.time() - config.START_TIME)}") + +def set_named_timestamp(action: str, node=None, key=None): + if key is None: + key = f"{action}{',' + str(node) if node is not None else ''}" + config.NAMED_TIMESTAMPS[key] = time.time() + +def invalidate_named_timestamp(action: str, node=None, key=None): + if key is None: + key = f"{action}{',' + str(node) if node is not None else ''}" + del config.NAMED_TIMESTAMPS[key] + +def log_time_delta_from_start_and_set_named_timestamp(module: str, action: str, node=None, key=None): + try: + set_named_timestamp(action, node, key) + logging.info(f">|{module}|{action}{',' + str(node) if node is not None else ''}|Time from start:{to_milliseconds_str(time.time() - config.START_TIME)}") + except KeyError: + logging.error(f"Named timestamp {key} already exists") + +def log_time_delta_from_named_timestamp(module: str, action: str, node=None, key=None, invalidate=True): + try: + if key is None: + key = f"{action}{',' + str(node) if node is not None else ''}" + logging.info(f">|{module}|{action}{',' + str(node) if node is not None else ''}|Time from start:{to_milliseconds_str(time.time() - config.START_TIME)}|Step time:{to_milliseconds_str(time.time() - config.NAMED_TIMESTAMPS[key])}") + if invalidate: + invalidate_named_timestamp(action, node, key) + except KeyError: + logging.error(f"Named timestamp {key} does not exist") + +def to_milliseconds_str(seconds: float) -> str: + return f"{seconds * 1000:.3f}ms" + + + +def get_all_child_processes(pid): + try: + parent = psutil.Process(pid) + except psutil.NoSuchProcess: + return [] + + children = parent.children(recursive=True) + parent_of_parent = parent.parent() + logging.critical("PARENT_PROCESS: " + str(parent_of_parent)) + logging.critical("MAIN_PROCESS: " + str(parent)) + all_processes = [parent] + children + for process in all_processes: + logging.critical("PROCESS: " + str(process)) + return all_processes + + +def kill_process_tree(pid, sig=signal.SIGTERM): + processes = get_all_child_processes(pid) + for proc in processes: + try: + os.kill(proc.pid, sig) + except (psutil.NoSuchProcess): + pass + except (PermissionError): + logging.critical("NO PERMISSION") + except (ProcessLookupError): + logging.critical("PROCESS LOOKUP ERROR") + + # Check if processes are still alive + alive_processes = [] + for proc in processes: + try: + if proc.is_running(): + alive_processes.append(f"{proc}-({proc.status()})") + except: + pass + return alive_processes diff --git a/report/README.md b/report/README.md new file mode 100644 index 00000000..2b4dc1a5 --- /dev/null +++ b/report/README.md @@ -0,0 +1,74 @@ +# hs Benchmark Directory README + +Welcome to the benchmark directory of the `hs`. This directory contains the essential tools and scripts to run benchmarks, analyze logs, and generate reports. + +## Overview + +The benchmarking tool provides an interface to run different benchmarks, collect performance metrics, and visualize the results through plots. It supports a wide range of features including: +- Running benchmarks with `bash` and `hs`. +- Comparing the outputs and performance of `Bash` and `hs`. +- Generating Gantt charts for each benchmark. +- Producing detailed logs and CSV results. + +## Environment Variables + +The benchmarking tool sets up and exports a few essential environment variables for the system: + +- `WORKING_DIR`: The directory for the benchmarks and reports. +- `TEST_SCRIPT_DIR`: The directory containing benchmark scripts. +- `RESOURCE_DIR`: The directory to store resources required by benchmarks. +- `PASH_TOP`: The directory of `pash`. +- `PASH_SPEC_TOP`: The top directory of `hs`. + +## Command-Line Interface + +The primary command-line interface for the benchmark runner includes: +- `--no-plots`: Do not generate plots. +- `--no-logs`: Do not save log files. +- `--csv-output`: Save the results in CSV format. + +## Benchmark Configuration + +Benchmarks are configured using the `benchmark_config.json` file. Each benchmark in the configuration has the following properties: + +- `name`: The name of the benchmark. +- `env`: A list of environment variables required by the benchmark. +- `pre_execution_script`: A list of commands to run before executing the benchmark. Useful for fetching data or setting up the environment. +- `command`: The command or script to benchmark. +- `orch_args`: Arguments to pass to the `orch` system when running the benchmark. + +Example: + +```json +[ + { + "name": "Dgsh 1.sh - 120M", + "env": ["INPUT_FILE={RESOURCE_DIR}/in120M.xml"], + "pre_execution_script": ["wget -nc -O in120M.xml http://aiweb.cs.washington.edu/research/projects/xmltk/xmldata/data/dblp/dblp.xml"], + "command": "{TEST_SCRIPT_DIR}/dgsh/1.sh", + "orch_args": "-d 2 --sandbox-killing" + } +] +``` + +## Running Benchmarks + +To run benchmarks: + +1. Navigate to the directory containing the benchmark runner (`cd ./report` from the top-level directory). +2. Execute the benchmark runner with desired arguments, e.g., `python3 benchmark_runner.py --csv-output`. + +After running, the results, including logs, plots, and CSV files (if selected), will be saved in the `report_output` directory. + +## Results Interpretation + +The results include: +- Execution times for `bash` and `hs`. +- A comparison of the execution times. +- Validity checks for the outputs. +- Detailed execution logs. +- Gantt and bar charts visualizing the execution. + +## Contributions + +Feel free to contribute to the benchmark suite by adding new benchmarks or improving existing ones. Ensure that any new benchmarks have the necessary configuration in the `benchmark_config.json` file. diff --git a/report/benchmark_config.json b/report/benchmark_config.json new file mode 100644 index 00000000..4da3ed1b --- /dev/null +++ b/report/benchmark_config.json @@ -0,0 +1,101 @@ +[ + { + "name": "Dgsh 6.sh", + "env": ["INPUT_FILE={RESOURCE_DIR}/words.txt"], + "pre_execution_script": ["wget -nc -O words.txt https://raw.githubusercontent.com/dwyl/english-words/master/words.txt"], + "command": "{TEST_SCRIPT_DIR}/dgsh/6.sh", + "orch_args": "-d 2" + }, + { + "name": "Dgsh 2.sh (no function) - Τry Repo", + "pre_execution_script": ["git clone https://github.com/binpash/try.git"], + "command": "{TEST_SCRIPT_DIR}/dgsh/2_no_func.sh", + "working_dir": "{RESOURCE_DIR}/try", + "orch_args": "-d 2" + }, + { + "name": "Dgsh 2.sh (no function) - PaSh Repo", + "pre_execution_script": ["git clone https://github.com/binpash/pash.git"], + "command": "{TEST_SCRIPT_DIR}/dgsh/2_no_func.sh", + "working_dir": "{RESOURCE_DIR}/try", + "orch_args": "-d 2" + }, + { + "name": "Dgsh 2.sh (no function) - PaSh Repo", + "pre_execution_script": ["git clone https://github.com/binpash/pash.git"], + "command": "{TEST_SCRIPT_DIR}/dgsh/2_no_func.sh", + "working_dir": "{RESOURCE_DIR}/try", + "orch_args": "-d 2" + }, + { + "name": "Dgsh 3.sh - Riker Repo", + "pre_execution_script": ["git clone https://github.com/curtsinger-lab/riker.git"], + "command": "{TEST_SCRIPT_DIR}/dgsh/3.sh", + "working_dir": "{RESOURCE_DIR}/riker", + "orch_args": "-d 2" + }, + { + "name": "Dgsh 4.sh (no function) - PaSh Repo", + "pre_execution_script": ["git clone https://github.com/curtsinger-lab/riker.git"], + "command": "{TEST_SCRIPT_DIR}/dgsh/2_no_func.sh", + "working_dir": "{RESOURCE_DIR}/riker", + "orch_args": "-d 2" + }, + { + "name": "Dgsh 5.sh - 2M", + "env": ["INPUT_FILE={RESOURCE_DIR}/in2M.xml"], + "pre_execution_script": ["wget -nc -O in2M.xml http://aiweb.cs.washington.edu/research/projects/xmltk/xmldata/data/mondial/mondial-3.0.xml"], + "command": "{TEST_SCRIPT_DIR}/dgsh/5.sh", + "orch_args": "-d 2" + }, + { + "name": "Dgsh 5.sh - 120M", + "env": ["INPUT_FILE={RESOURCE_DIR}/in120M.xml"], + "pre_execution_script": ["wget -nc -O in120M.xml http://aiweb.cs.washington.edu/research/projects/xmltk/xmldata/data/dblp/dblp.xml"], + "command": "{TEST_SCRIPT_DIR}/dgsh/5.sh", + "orch_args": "-d 2" + }, + { + "name": "Dgsh 6.sh", + "env": ["INPUT_FILE={RESOURCE_DIR}/words.txt"], + "pre_execution_script": ["wget -nc -O words.txt https://raw.githubusercontent.com/dwyl/english-words/master/words.txt"], + "command": "{TEST_SCRIPT_DIR}/dgsh/6.sh", + "orch_args": "-d 2" + }, + { + "name": "7.sh - kill", + "env": ["INPUT_FILE={RESOURCE_DIR}/words.txt"], + "pre_execution_script": ["wget -nc -O weblog.log https://raw.githubusercontent.com/elastic/examples/master/Common%20Data%20Formats/apache_logs/apache_logs"], + "command": "{TEST_SCRIPT_DIR}/dgsh/8_no_func.sh", + "orch_args": "-d 2 --sandbox-killing" + }, + { + "name": "7.sh - kill", + "env": ["INPUT_FILE={RESOURCE_DIR}/words.txt"], + "pre_execution_script": ["wget -nc -O weblog.log https://raw.githubusercontent.com/elastic/examples/master/Common%20Data%20Formats/apache_logs/apache_logs"], + "command": "{TEST_SCRIPT_DIR}/dgsh/8_no_func.sh", + "orch_args": "-d 2 --sandbox-killing" + }, + { + "name": "Dgsh 9.sh - Riker Repo", + "env": ["INPUT={RESOURCE_DIR}/../../deps/riker/"], + "pre_execution_script": ["git clone https://github.com/curtsinger-lab/riker.git"], + "command": "{TEST_SCRIPT_DIR}/dgsh/9.sh", + "working_dir": "{RESOURCE_DIR}/riker", + "orch_args": "-d 2" + }, + { + "name": "18.sh - 120M", + "env": ["INPUT_FILE={RESOURCE_DIR}/in120M.xml"], + "pre_execution_script": ["wget -nc -O in120M.xml http://aiweb.cs.washington.edu/research/projects/xmltk/xmldata/data/dblp/dblp.xml"], + "command": "{TEST_SCRIPT_DIR}/dgsh/18.sh", + "orch_args": "-d 2" + }, + { + "name": "18.sh - 120M - kill", + "env": ["INPUT_FILE={RESOURCE_DIR}/in120M.xml"], + "pre_execution_script": ["wget -nc -O in120M.xml http://aiweb.cs.washington.edu/research/projects/xmltk/xmldata/data/dblp/dblp.xml"], + "command": "{TEST_SCRIPT_DIR}/dgsh/18.sh", + "orch_args": "-d 2 --sandbox-killing" + } +] diff --git a/report/benchmark_plots.py b/report/benchmark_plots.py new file mode 100644 index 00000000..7e7996d2 --- /dev/null +++ b/report/benchmark_plots.py @@ -0,0 +1,73 @@ +import os +import matplotlib.pyplot as plt +# import scienceplots + +# plt.style.use('science') + +# Plot a comparison of execution times for Bash and hs. +def plot_benchmark_times_combined(benchmarks, bash_times, orch_times, output_dir, filename): + fig, ax = plt.subplots(figsize=(10,6)) + + # Define bar width and positions + bar_width = 0.35 + index = range(len(benchmarks)) + + bar1 = ax.bar(index, bash_times, bar_width, label='bash', color='b') + bar2 = ax.bar([i+bar_width for i in index], orch_times, bar_width, label='hs', color='r') + + ax.set_xlabel('Benchmarks') + ax.set_ylabel('Execution Time (s)') + ax.set_title('Execution Time Comparison: Bash vs orch') + ax.set_xticks([i + bar_width/2 for i in index]) + ax.set_xticklabels(benchmarks) + ax.legend() + + plt.tight_layout() + plt.savefig(os.path.join(output_dir, f"{filename}.pdf")) + +def plot_benchmark_times_individual(benchmarks, bash_times, orch_times, output_dir, filename): + num_benchmarks = len(benchmarks) + fig, axes = plt.subplots(num_benchmarks, 1, figsize=(10, 6*num_benchmarks)) + # Check if only one benchmark, else wrap axes in a list + if num_benchmarks == 1: + axes = [axes] + for ax, benchmark, bash_time, pash_time in zip(axes, benchmarks, bash_times, orch_times): + bar_width = 0.2 + labels = ['Bash', 'hs'] + times = [bash_time, pash_time] + ax.bar(labels, times, width=bar_width, color=['b', 'r']) + ax.set_ylabel('Execution Time (s)') + ax.set_title(f'Execution Time Comparison for {benchmark}: Bash vs hs') + + plt.tight_layout() + plt.savefig(os.path.join(output_dir, f"{filename}.pdf")) + + +def plot_gantt(activities, output_dir, filename, simple=False): + + if simple: + activities = [activity for activity in activities if activity[0].startswith("RunNode,") or activity[0] == "Wait"] + + # Set figure height based on the number of activities + fig_height = len(activities) + fig, ax = plt.subplots(figsize=(15, 0.2 * fig_height)) + + # Sort the activities by their start time + activities.sort(key=lambda x: x[1]) + + bar_height = 0.8 + gap = 0.2 + + for index, activity in enumerate(activities): + action, start_time, duration = activity + ax.broken_barh([(start_time, duration)], (index*(bar_height + gap), bar_height), facecolors='blue', edgecolor='black') + ax.text(start_time + duration/2, index*(bar_height + gap) + bar_height/2, action, ha='center', va='center', fontsize=6, color='gray') + + ax.set_xlabel('Time (ms)') + ax.set_title(f'Gantt Chart of {filename.strip("_gantt.pdf")}') + ax.set_yticks([i*(bar_height + gap) + bar_height/2 for i in range(len(activities))]) + ax.set_yticklabels([activity[0] for activity in activities], rotation=30, fontsize=8) + ax.grid(True) + + plt.tight_layout() + plt.savefig(os.path.join(output_dir, f"{filename}.pdf")) diff --git a/report/benchmark_report.py b/report/benchmark_report.py new file mode 100644 index 00000000..0eab564b --- /dev/null +++ b/report/benchmark_report.py @@ -0,0 +1,251 @@ +import subprocess +import time +import json +import os +from benchmark_plots import * +import logging +import difflib +import argparse +import csv + + + +# Setting and exporting environment variables (same as tests for now). +# This will change in the future. +os.environ['ORCH_TOP'] = os.environ.get('ORCH_TOP', subprocess.check_output(['git', 'rev-parse', '--show-toplevel', '--show-superproject-working-tree']).decode('utf-8').strip()) +os.environ['WORKING_DIR'] = os.path.join(os.environ['ORCH_TOP'], 'report') +os.environ['TEST_SCRIPT_DIR'] = os.path.join(os.environ['WORKING_DIR'], 'benchmarks') +os.environ['RESOURCE_DIR'] = os.path.join(os.environ['WORKING_DIR'], 'resources') +os.environ['PASH_TOP'] = os.path.join(os.environ['ORCH_TOP'], 'deps', 'pash') +os.environ['PASH_SPEC_TOP'] = os.path.join(os.environ['ORCH_TOP']) + +BASH_COMMAND = "/bin/bash" +ORCH_COMMAND = os.path.join(os.environ['ORCH_TOP'], 'pash-spec.sh') +REPORT_OUTPUT_DIR = os.path.join(os.environ['WORKING_DIR'], 'report_output') + + +def parse_args(): + parser = argparse.ArgumentParser(description="Benchmark and report interface for a system.") + parser.add_argument('--no-plots', action='store_true', help="Do not print plots.") + parser.add_argument('--no-logs', action='store_true', help="Do not save log files.") + parser.add_argument('--csv-output', action='store_true', help="Save results in CSV format.") + return parser.parse_args() + + +def save_log_data(log_data, output_dir, filename): + if args.no_logs: + return + with open(os.path.join(output_dir, filename), 'w') as f: + f.write(log_data) + + +def parse_logs_into_activities(log_data): + info_lines = [line.replace("INFO:root:>|", "").split("|") for line in log_data.split("\n") if line.startswith("INFO:root:>|")] + # Define a regex pattern to extract data from the log lines + pattern = r">\|(?P[\w\-,]+)\|Time from start:(?P[\d\.]+)ms" + step_time_pattern = r"Step time:(?P[\d\.]+)ms" + + activities = [] + + for line in info_lines: + if len(line) == 4: + activity = line[1] + end_time = float(line[2].split(":")[1].rstrip("ms")) + step_time = float(line[3].split(":")[1].rstrip("ms")) + start_time = end_time - step_time + activities.append((activity, start_time, step_time)) + return activities + +def replace_with_env_var(input_string): + format_args = { + "TEST_SCRIPT_DIR": os.environ.get("TEST_SCRIPT_DIR", os.getcwd()), + "RESOURCE_DIR": os.environ.get("RESOURCE_DIR", os.getcwd()) + } + replaced_string = input_string.format(**format_args) + return replaced_string + +def run_pre_execution_command(command, working_dir=os.getcwd()): + print("Running pre-execution command:", command) + process = subprocess.Popen(command, cwd=working_dir) + process.wait() + return process.returncode + +def run_command(command, working_dir=os.getcwd()): + print("Running (and timing) command: ", " ".join(command)) + start_time = time.time() + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=working_dir) + stdout, stderr = process.communicate() + end_time = time.time() + return (end_time - start_time, stdout.decode('utf-8'), stderr.decode('utf-8')) + +def run_command_with_orch(command, orch_args, working_dir=os.getcwd()): + orch_args = orch_args.split(" ") + print("Running (and timing) command with orch: ", " ".join([ORCH_COMMAND] + orch_args + command)) + print([ORCH_COMMAND] + orch_args + command) + start_time = time.time() + process = subprocess.Popen([ORCH_COMMAND] + orch_args + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=working_dir, env=os.environ) + stdout, stderr = process.communicate() + end_time = time.time() + return (end_time - start_time, stdout.decode('utf-8'), stderr.decode('utf-8')) + +def compare_results(bash_output, orch_output): + bash_lines = bash_output.splitlines() + if len(bash_output) <= 10000: + orch_lines = orch_output.splitlines()[:10000] + else: + bash_lines = bash_lines[:10000] + orch_lines = orch_output.splitlines()[:10000] + # Compare lines + d = difflib.ndiff(bash_lines, orch_lines) + return [diff for diff in d if diff.startswith('- ') or diff.startswith('+ ')] + + +def print_results(benchmark_name, bash_time, orch_time, diff_lines, diff_percentage): + if bash_time > orch_time: + speedup = bash_time / orch_time + comparison_result = f"hs is {speedup:.2f}x ({bash_time - orch_time:.2f}s) faster than Bash" + else: + speedup = orch_time / bash_time + comparison_result = f"hs is {speedup:.2f}x ({orch_time - bash_time:.2f}s) slower than Bash" + print("-" * 40) + print(f"Results for benchmark: {benchmark_name}") + print(f"Bash Execution Time: {round(bash_time, 3)}s") + print(f"hs Execution Time: {round(orch_time, 3)}s") + print(f"Valid: {'Yes' if len(diff_lines) == 0 else 'No - see below'}") + if len(diff_lines) > 0: + for line in diff_lines: + print(line) + print("-" * 40) + print(comparison_result) + print() + if args.csv_output: + csv_filename = os.path.join(REPORT_OUTPUT_DIR, f"results.csv") + with open(csv_filename, 'a') as csv_file: + writer = csv.writer(csv_file) + valid = 'Yes' if len(diff_lines) == 0 else 'No' + writer.writerow([benchmark_name, bash_time, orch_time, valid, comparison_result]) + + +def print_exec_time_for_cmds(orch_outpt, benchmark_name): + # Split the log into lines and filter the relevant ones + relevant_lines = [line.replace("INFO:root:>|PartialOrder|RunNode,", "") for line in orch_outpt.split("\n") if line.startswith("INFO:root:>|PartialOrder|RunNode,") and "Step time:" in line] + # Extract lines with RunNode commands and their step times + node_and_times = [(int(line.split("|")[0]), float(line.split("|")[1].split(":")[1][:-2]), float(line.split("|")[2].split(":")[1][:-2])) for line in relevant_lines] + + # Total number of times a RunNode command was executed + total_run_node_commands = len(node_and_times) + + # Total time of all RunNode commands + total_time = sum([entry[2] for entry in node_and_times]) + + # Extract and sum the total time of the step per node + node_times = {} + node_distinct_times = {} + counts = {} + for node, _, time in node_and_times: + if node in node_times: + node_times[node] += time + node_distinct_times[node].append(time) + counts[node] += 1 + else: + node_times[node] = time + node_distinct_times[node] = [time] + counts[node] = 1 + + time_lost_per_node = {node: sum(node_distinct_times[node]) - node_distinct_times[node][-1] for node in node_times} + + print("-" * 40) + print(f"Total number of times a RunNode command was executed: {total_run_node_commands}") + print(f"Total time of all RunNode commands: {total_time:.3f}ms") + print("\nTotal execution time per node:") + for node, time_lost in sorted(time_lost_per_node.items(), key=lambda x: x[1], reverse=True): + print(f"{node:2d}: {node_times[node]:.3f}ms ({counts[node]} times) | Avg: {sum(node_distinct_times[node])/len(node_distinct_times[node]):.3f}ms | {node_distinct_times[node]} | Time lost: {time_lost:.3f}ms") + print("-" * 40) + print(f"Total time lost: {sum(time_lost_per_node.values()):.02f}ms") + print("=" * 100) + + if args.csv_output: + csv_filename = os.path.join(REPORT_OUTPUT_DIR, f"{benchmark_name}_execution_times.csv") + with open(csv_filename, 'w') as csv_file: + writer = csv.writer(csv_file) + writer.writerow(["Node", "Time (ms)", "Execution Count", "Average Time (ms)", "Distinct Times", "Time Lost (ms)"]) + for node, time_lost in sorted(time_lost_per_node.items(), key=lambda x: x[1], reverse=True): + writer.writerow([node, node_times[node], counts[node], sum(node_distinct_times[node])/len(node_distinct_times[node]), node_distinct_times[node], time_lost]) + + + +def export_env_vars(env_vars): + for env_var in env_vars: + lhs, rhs = env_var.split("=") + rhs = replace_with_env_var(rhs) + os.environ[lhs] = rhs + + +def main(): + + # Load benchmark configurations + with open(os.path.join(os.environ.get('WORKING_DIR'), 'benchmark_config.json'), 'r') as f: + benchmarks_config = json.load(f) + + bash_times = [] + orch_times = [] + + # Create output dir for reports + os.makedirs(REPORT_OUTPUT_DIR, exist_ok=True) + + if args.csv_output: + csv_filename = os.path.join(REPORT_OUTPUT_DIR, f"results.csv") + with open(csv_filename, 'w') as csv_file: + writer = csv.writer(csv_file) + writer.writerow(["Benchmark", "Bash Execution Time", "hs Execution Time", "Valid", "Comparison"]) + + for benchmark in benchmarks_config: + print("=" * 100) + # Set up preferred environment + export_env_vars(benchmark.get('env', {})) + # Create resource dir if non-existent + os.makedirs(os.environ.get('RESOURCE_DIR'), exist_ok=True) + # Run pre-execution commands + for pre_command in benchmark.get('pre_execution_script', []): + print(f"{pre_command}") + split_pre_command = replace_with_env_var(pre_command).split(" ") + run_pre_execution_command(split_pre_command, os.environ.get('RESOURCE_DIR')) + + working_dir = replace_with_env_var(benchmark.get('working_dir', os.environ.get('TEST_SCRIPT_DIR'))) + + bash_cmd_str = [BASH_COMMAND] + replace_with_env_var(benchmark['command']).split(" ") + bash_time, bash_output, _bash_error = run_command(bash_cmd_str, working_dir) + orch_cmd_str = replace_with_env_var(benchmark['command']).split(" ") + orch_time, orch_output, orch_error = run_command_with_orch(orch_cmd_str, benchmark['orch_args'], working_dir) + bash_times.append(bash_time) + orch_times.append(orch_time) + diff_lines = compare_results(bash_output, orch_output) + diff_percentage = abs((bash_time - orch_time) / bash_time) * 100 + + print_results(benchmark['name'], bash_time, orch_time, diff_lines, diff_percentage) + + activities = parse_logs_into_activities(log_data=orch_error) + if not args.no_plots: + plot_gantt(activities, REPORT_OUTPUT_DIR, f"{benchmark['name']}_gantt", simple=True) + + print_exec_time_for_cmds(orch_error, benchmark['name']) + + # Instead of always saving the logs, check the argument: + if not args.no_logs: + save_log_data(orch_error, REPORT_OUTPUT_DIR, f"{benchmark['name']}_log.log") + + + + # Plot the results + benchmark_names = [benchmark['name'] for benchmark in benchmarks_config] + + print(f"Execution graphs can be found in {REPORT_OUTPUT_DIR}") + + if not args.no_plots: + plot_benchmark_times_combined(benchmark_names, bash_times, orch_times, REPORT_OUTPUT_DIR, "benchmark_times_combined") + plot_benchmark_times_individual(benchmark_names, bash_times, orch_times, REPORT_OUTPUT_DIR, "benchmark_times_individual") + + +if __name__ == "__main__": + args = parse_args() + main() diff --git a/report/benchmarks/dgsh/1.sh b/report/benchmarks/dgsh/1.sh new file mode 100755 index 00000000..8312c443 --- /dev/null +++ b/report/benchmarks/dgsh/1.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +## Initialize the necessary temporary files +file1=$(mktemp) +cat $INPUT_FILE >"$file1" +printf 'File type:\t' +file - <"$file1" + +printf 'Original size:\t' +wc -c <"$file1" + +printf 'xz:\t\t' +xz -c <"$file1" | wc -c + +printf 'bzip2:\t\t' +bzip2 -c <"$file1" | wc -c + +printf 'gzip:\t\t' +gzip -c <"$file1" | wc -c \ No newline at end of file diff --git a/report/benchmarks/dgsh/17.sh b/report/benchmarks/dgsh/17.sh new file mode 100644 index 00000000..718935c2 --- /dev/null +++ b/report/benchmarks/dgsh/17.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# Initialize the necessary temporary files +file1=$(mktemp) +file2=$(mktemp) +file3=$(mktemp) +file4=$(mktemp) + +# Save the ls output to a temporary file +ls -n > "$file1" + +# Reorder fields in DIR-like way +awk '!/^total/ {print $6, $7, $8, $1, sprintf("%8d", $5), $9}' "$file1" > "$file2" + +# Count number of files +wc -l "$file1" | tr -d \\n > "$file3" +echo -n ' File(s) ' >> "$file3" +awk '{s += $5} END {printf("%d bytes\n", s)}' "$file1" >> "$file3" + +# Count number of directories and print label for number of dirs and calculate free bytes +grep -c '^d' "$file1" | tr -d \\n > "$file4" +df -h . | awk '!/Use%/{print " Dir(s) " $4 " bytes free"}' >> "$file4" + +# Display the results +cat "$file2" "$file3" "$file4" diff --git a/report/benchmarks/dgsh/18.sh b/report/benchmarks/dgsh/18.sh new file mode 100644 index 00000000..effa236f --- /dev/null +++ b/report/benchmarks/dgsh/18.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +# Initialize the necessary temporary files +file1=$(mktemp) +file2=$(mktemp) +file3=$(mktemp) + +# Read the input stream and save to a temporary file +cat $INPUT_FILE > "$file1" + +# Process the input in two different ways +cut -d , -f 5-6 "$file1" > "$file2" +cut -d , -f 2-4 "$file1" > "$file3" + +# Merge the processed results +paste -d , "$file2" "$file3" diff --git a/report/benchmarks/dgsh/1_reversed.sh b/report/benchmarks/dgsh/1_reversed.sh new file mode 100644 index 00000000..6c728b6d --- /dev/null +++ b/report/benchmarks/dgsh/1_reversed.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +## Initialize the necessary temporary files +file1=$(mktemp) +cat $INPUT_FILE >"$file1" +printf 'File type:\t' +file - <"$file1" + +printf 'Original size:\t' +wc -c <"$file1" + +printf 'gzip:\t\t' +gzip -c <"$file1" | wc -c + +printf 'bzip2:\t\t' +bzip2 -c <"$file1" | wc -c + +printf 'xz:\t\t' +xz -c <"$file1" | wc -c diff --git a/report/benchmarks/dgsh/2_no_func.sh b/report/benchmarks/dgsh/2_no_func.sh new file mode 100644 index 00000000..37e28857 --- /dev/null +++ b/report/benchmarks/dgsh/2_no_func.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +## Note: Needs to be run on a big git repository to make sense (maybe linux) + +## Initialize the necessary temporary files +file1=$(mktemp) +git log --format="%an:%ad" --date=default "$@" >"$file1" + +echo "Authors ordered by number of commits" +# Order by frequency +awk -F: '{print $1}' <"$file1" | sort | uniq | sort -rn + +echo "Days ordered by number of commits" +# Order by frequency +awk -F: '{print substr($2, 1, 3)}' <"$file1" | sort | uniq | sort -rn diff --git a/report/benchmarks/dgsh/3.sh b/report/benchmarks/dgsh/3.sh new file mode 100644 index 00000000..93ecbf7b --- /dev/null +++ b/report/benchmarks/dgsh/3.sh @@ -0,0 +1,130 @@ +#!/bin/bash + +## Note: Needs to be run on a big git repository to make sense (maybe linux) + +## Initialize the necessary temporary files +file1=$(mktemp) +file2=$(mktemp) +file3=$(mktemp) +file4=$(mktemp) + +find "$@" \( -name \*.c -or -name \*.h \) -type f -print0 >"$file1" + +echo -n 'FNAMELEN: ' + +tr \\0 \\n <"$file1" | +sed 's|^.*/||' | +awk '{s += length($1); n++} END { + if (n>0) + print s / n; + else + print 0; }' + +xargs -0 /bin/cat <"$file1" >"$file2" + +sed 's/#/@/g;s/\\[\\"'\'']/@/g;s/"[^"]*"/""/g;'"s/'[^']*'/''/g" <"$file2" | + cpp -P >"$file3" + +# Structure definitions +echo -n 'NSTRUCT: ' + +egrep -c 'struct[ ]*{|struct[ ]*[a-zA-Z_][a-zA-Z0-9_]*[ ]*{' <"$file3" +#}} (match preceding openings) + +# Type definitions +echo -n 'NTYPEDEF: ' +grep -cw typedef <"$file3" + +# Use of void +echo -n 'NVOID: ' +grep -cw void <"$file3" + +# Use of gets +echo -n 'NGETS: ' +grep -cw gets <"$file3" + +# Average identifier length +echo -n 'IDLEN: ' + +tr -cs 'A-Za-z0-9_' '\n' <"$file3" | +sort -u | +awk '/^[A-Za-z]/ { len += length($1); n++ } END { + if (n>0) + print len / n; + else + print 0; }' + +echo -n 'CHLINESCHAR: ' +wc -lc <"$file2" | + awk '{OFS=":"; print $1, $2}' + +echo -n 'NCCHAR: ' +sed 's/#/@/g' <"$file2" | +cpp -traditional -P | +wc -c | +awk '{OFMT = "%.0f"; print $1/1000}' + +# Number of comments +echo -n 'NCOMMENT: ' +egrep -c '/\*|//' <"$file2" + +# Occurences of the word Copyright +echo -n 'NCOPYRIGHT: ' +grep -ci copyright <"$file2" + +# C files +find "$@" -name \*.c -type f -print0 >"$file2" + +# Convert to newline separation for counting +tr \\0 \\n <"$file2" >"$file3" + +# Number of C files +echo -n 'NCFILE: ' +wc -l <"$file3" + +# Number of directories containing C files +echo -n 'NCDIR: ' +sed 's,/[^/]*$,,;s,^.*/,,' <"$file3" | +sort -u | +wc -l + +# C code +xargs -0 /bin/cat <"$file2" >"$file3" + +# Lines and characters +echo -n 'CLINESCHAR: ' +wc -lc <"$file3" | +awk '{OFS=":"; print $1, $2}' + +# C code without comments and strings +sed 's/#/@/g;s/\\[\\"'\'']/@/g;s/"[^"]*"/""/g;'"s/'[^']*'/''/g" <"$file3" | +cpp -P >"$file4" + +# Number of functions +echo -n 'NFUNCTION: ' +grep -c '^{' <"$file4" + +# Number of gotos +echo -n 'NGOTO: ' +grep -cw goto <"$file4" + +# Occurrences of the register keyword +echo -n 'NREGISTER: ' +grep -cw register <"$file4" + +# Number of macro definitions +echo -n 'NMACRO: ' +grep -c '@[ ]*define[ ][ ]*[a-zA-Z_][a-zA-Z0-9_]*(' <"$file4" +# Number of include directives +echo -n 'NINCLUDE: ' +grep -c '@[ ]*include' <"$file4" + +# Number of constants +echo -n 'NCONST: ' +grep -ohw '[0-9][x0-9][0-9a-f]*' <"$file4" | wc -l + + +# Header files +echo -n 'NHFILE: ' +find "$@" -name \*.h -type f | +wc -l \ No newline at end of file diff --git a/report/benchmarks/dgsh/4.sh b/report/benchmarks/dgsh/4.sh new file mode 100644 index 00000000..e2f0c003 --- /dev/null +++ b/report/benchmarks/dgsh/4.sh @@ -0,0 +1,19 @@ + +#!/bin/bash + +## Initialize the necessary temporary files +file1=$(mktemp) +file2=$(mktemp) + +# Create list of files +find "$@" -type f | +xargs openssl md5 | +sed 's/^MD5(//;s/)= / /' | +sort -k2 > "$file1" +awk '{print $2}' < "$file1" | uniq -d > "$file2" +join -2 2 "$file2" "$file1" | +awk ' +BEGIN {ORS=""} +$1 != prev && prev {print "\n"} +END {if (prev) print "\n"} +{if (prev) print " "; prev = $1; print $2}' diff --git a/report/benchmarks/dgsh/5.sh b/report/benchmarks/dgsh/5.sh new file mode 100755 index 00000000..0c639dad --- /dev/null +++ b/report/benchmarks/dgsh/5.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +## Initialize the necessary temporary files +file1=$(mktemp) +file2=$(mktemp) +file3=$(mktemp) +file4=$(mktemp) + +# export LC_ALL=C + +cat $INPUT_FILE >"$file1" + +# Find errors + +# Obtain list of words in text +cat "$file1" | +tr -cs A-Za-z \\n | +tr A-Z a-z | +sort -u > "$file2" + +# Ensure dictionary is compatibly sorted +cat "$file1" | +sort /usr/share/dict/words > "$file3" + +# List errors as a set difference +comm -23 "$file2" "$file3" > "$file4" + +fgrep -f "$file4" -i --color -w -C 2 "$file1" diff --git a/report/benchmarks/dgsh/6.sh b/report/benchmarks/dgsh/6.sh new file mode 100755 index 00000000..ad1001d4 --- /dev/null +++ b/report/benchmarks/dgsh/6.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +## Initialize the necessary temporary files +file1=$(mktemp) +file2=$(mktemp) +file3=$(mktemp) +file4=$(mktemp) +file5=$(mktemp) + +cat $INPUT_FILE > $file1 + +# Consistent sorting across machines +# export LC_ALL=C + +# Stream input from file and split input one word per line +# Create list of unique words +tr -cs a-zA-Z '\n' < "$file1" | +sort -u > "$file2" + +# List two-letter palindromes +sed 's/.*\(.\)\(.\)\2\1.*/p: \1\2-\2\1/;t;g' "$file2" > "$file3" + +# List four consecutive consonants +sed -E 's/.*([^aeiouyAEIOUY]{4}).*/c: \1/;t;g' "$file2" > "$file4" + +# List length of words longer than 12 characters +awk '{if (length($1) > 12) print "l:", length($1); + else print ""}' "$file2" > "$file5" + +# Paste the four streams side-by-side +# List only words satisfying one or more properties +paste "$file2" "$file3" "$file4" "$file5" | +fgrep : diff --git a/report/benchmarks/dgsh/7.sh b/report/benchmarks/dgsh/7.sh new file mode 100755 index 00000000..a29a8e8d --- /dev/null +++ b/report/benchmarks/dgsh/7.sh @@ -0,0 +1,152 @@ +#!/bin/bash + +# Consistent sorting +export LC_ALL=C + +# Print initial header only if DGSH_DRAW_EXIT is not set +if [ -z "${DGSH_DRAW_EXIT}" ] +then + cat < "$file_initial" + +# Number of accesses +echo -n 'Number of accesses: ' +wc -l < "$file_initial" + +# Total transferred bytes +awk '{s += $NF} END {print s}' "$file_initial" > "$file_bytes" +echo -n 'Number of Gbytes transferred: ' +awk '{print $1 / 1024 / 1024 / 1024}' "$file_bytes" + +# Process Host names +awk '{print $1}' "$file_initial" > "$file_hosts" + +# Number of accesses +echo -n 'Number of accesses: ' +wc -l < "$file_hosts" + +# Sorted hosts +sort "$file_hosts" > "$file_sorted_hosts" + +# Unique hosts +uniq "$file_sorted_hosts" > "$file_unique_hosts" +echo -n 'Number of hosts: ' +wc -l < "$file_unique_hosts" + +# Number of TLDs +awk -F. '$NF !~ /[0-9]/ {print $NF}' "$file_unique_hosts" | sort -u | wc -l +echo -n 'Number of top level domains: ' + +# Top 10 hosts +echo +echo "Top 10 Hosts" +echo "Top 10 Hosts" | sed 's/./-/g' + +uniq -c "$file_sorted_hosts" | sort -rn | head -10 +echo + +# Top 20 TLDs +echo +echo "Top 20 Level Domain Accesses" +echo "Top 20 Level Domain Accesses" | sed 's/./-/g' + +awk -F. '$NF !~ /^[0-9]/ {print $NF}' "$file_sorted_hosts" | sort | uniq -c | sort -rn | head -20 +echo + +# Domains +awk -F. 'BEGIN {OFS = "."} $NF !~ /^[0-9]/ {$1 = ""; print}' "$file_sorted_hosts" | sort > "$file_domains" + +# Number of domains +echo -n 'Number of domains: ' +uniq "$file_domains" | wc -l + +# Top 10 domains +echo +echo "Top 10 domains" +echo "Top 10 domains" | sed 's/./-/g' +uniq -c "$file_domains" | sort -rn | head -10 < "$file_domains" + +# Hosts by volume +echo +echo "Top 10 Hosts by Transfer" +echo "Top 10 Hosts by Transfer" | sed 's/./-/g' +awk ' {bytes[$1] += $NF} +END {for (h in bytes) print bytes[h], h}' "$file_initial" | sort -rn | head -10 + +# Sorted page name requests +awk '{print $7}' "$file_initial" | sort > "$file_requests" + +# Top 20 area requests (input is already sorted) +echo +echo "Top 20 area requests" +echo "Top 20 area requests" | sed 's/./-/g' +awk -F/ '{print $2}' "$file_requests" | uniq -c | sort -rn | head -20 +# Number of different pages +echo -n 'Number of different pages: ' +uniq "$file_requests" | wc -l + +# Top 20 requests +echo +echo "Top 20 requests" +echo "Top 20 requests" | sed 's/./-/g' +uniq -c "$file_requests" | sort -rn | head -20 + +# Access time: dd/mmm/yyyy:hh:mm:ss +awk '{print substr($4, 2)}' "$file_initial" > "$file_times" + +# Just dates +awk -F: '{print $1}' "$file_times" > "$file_dates" + +# Number of days +echo -n 'Accesses per day: ' +uniq "$file_dates" | wc -l > "$file_day_count" +awk ' +BEGIN { + getline NACCESS < "'"$file_initial"'" +} +{print NACCESS / $1}' "$file_day_count" + +echo -n 'MBytes per day: ' +awk ' +BEGIN { + getline NXBYTES < "'"$file_bytes"'" +} +{print NXBYTES / $1 / 1024 / 1024}' "$file_day_count" + +echo +echo "Accesses by Date" +echo "Accesses by Date" | sed 's/./-/g' +uniq -c < "$file_dates" + +# Accesses by day of week +echo +echo "Accesses by Day of Week" +echo "Accesses by Day of Week" | sed 's/./-/g' +sed 's|/|-|g' "$file_dates" | date -f - +%a 2>/dev/null | sort | uniq -c | sort -rn + +# Accesses by Local Hour +echo +echo "Accesses by Local Hour" +echo "Accesses by Local Hour" | sed 's/./-/g' +awk -F: '{print $2}' "$file_times" | sort | uniq -c diff --git a/report/benchmarks/dgsh/8_no_func.sh b/report/benchmarks/dgsh/8_no_func.sh new file mode 100755 index 00000000..4aab48ce --- /dev/null +++ b/report/benchmarks/dgsh/8_no_func.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +# Consistent sorting across machines +# export LC_ALL=C + +# Temporary files +file1=$(mktemp) +file2=$(mktemp) +file3=$(mktemp) +file4=$(mktemp) + +cat $INPUT_FILE > $file1 +cat $file1 + +# Split input one word per line +tr -cs a-zA-Z '\n' < "$file1" > "$file2" + +# Digram frequency +echo "Digram frequency" +perl -ne 'for ($i = 0; $i < length($_) - 2; $i++) { + print substr($_, $i, 2), "\n"; +}' < "$file2" | +awk '{count[$1]++} END {for (i in count) print count[i], i}' | +sort -rn + +# Trigram frequency +echo "Trigram frequency" +perl -ne 'for ($i = 0; $i < length($_) - 3; $i++) { + print substr($_, $i, 3), "\n"; +}' < "$file2" | +awk '{count[$1]++} END {for (i in count) print count[i], i}' | +sort -rn + +# Word frequency +echo "Word frequency" +awk '{count[$1]++} END {for (i in count) print count[i], i}' < "$file2" | +sort -rn + +# Store number of characters to use in awk below + +nchars=$(wc -c < "$file1") + +# Character frequency +# Print absolute +echo "Character frequency" +sed 's/./&\n/g' < "$file1" | +awk '{count[$1]++} END {for (i in count) print count[i], i}' | +sort -rn | tee "$file3" + +# Print relative +# echo "Relative character frequency" +# awk -v NCHARS=$nchars 'BEGIN { +# OFMT = "%.2g%%"} +# {print $1, $2, $1 / NCHARS * 100}' "$file3" \ No newline at end of file diff --git a/report/benchmarks/dgsh/9.sh b/report/benchmarks/dgsh/9.sh new file mode 100644 index 00000000..25b20804 --- /dev/null +++ b/report/benchmarks/dgsh/9.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +## Initialize the necessary temporary files +file1=$(mktemp) +file2=$(mktemp) +file3=$(mktemp) + +# Find object files and print defined symbols +find "$INPUT" -name "*.o" | xargs nm > "$file1" + +# List all defined (exported) symbols +awk 'NF == 3 && $2 ~ /[A-Z]/ {print $3}' "$file1" | sort > "$file2" + +# List all undefined (imported) symbols +awk '$1 == "U" {print $2}' "$file1" | sort > "$file3" + +# Print exports that are not imported +comm -23 "$file2" "$file3" diff --git a/report/scheduling_report.py b/report/scheduling_report.py deleted file mode 100644 index c687cb13..00000000 --- a/report/scheduling_report.py +++ /dev/null @@ -1,239 +0,0 @@ -#!/bin/env python3 - -from enum import Enum -import os -import sys -from dateutil import parser -import plotly.express as px -from datetime import datetime, date - - -class CommandState(Enum): - EXECUTING = "Executing" - EXECUTING_SANDBOXED = "Executing sandboxed" - STOPPED_NETWORK = "Stopped: network" - STOPPED_ERROR = "Stopped: ec!=0" - WAITING = "Waiting" - COMMITTED = "Committed" - NO_STATE = "No state" - - -class PashSpecTraceObject: - - def __init__(self, timestamp: datetime, action: str, message): - self.action = action - self.message = message - self.timestamp = timestamp.time() - - def __str__(self): - return f"PashSpecTraceObject({self.timestamp}|{self.action}|{self.message})" - - -class SchedulingStateSet: - - def handle_node(self, object): - self.nodes = [node_id for node_id in object.message.split(",")] - self.nodes.reverse() - self.start_timestamp = object.timestamp - - def __init__(self): - self.cmd_states = [] - self.unresolved_states = dict() - self.nodes = [] - self.marks = [] - self.bash_timestamp = None - self.start_timestamp = None - - def plot(self): - self.cmd_states.sort(key=lambda x: x["Command_Id"]) - fig1 = px.timeline(self.cmd_states, - y='Command_Id', - x_start="Start", - x_end="Finish", - hover_data=['Command_Id', 'State'], - color="State", - category_orders={"0": 1, "1": 2, "2": 3, "3": 4, "4": 5}) - fig1.update_layout(showlegend=True, xaxis_tickformat='%M:%S,%L', - yaxis_title="Command ID", xaxis_title="Time (ms)") - fig1.update_yaxes(categoryorder='array', categoryarray=self.nodes) - fig1.update_traces(marker=dict(size=12, - line=dict(width=2, - color='DarkSlateGrey')), - selector=dict(mode='markers')) - - # Add commit markers - y = [mark["y"] for mark in self.marks if mark["event"] == "Commit"] - x = [mark["x"] for mark in self.marks if mark["event"] == "Commit"] - fig1.add_scatter(y=y, x=x, - marker_symbol="diamond", - marker=dict(color='Black', size=16), - mode="markers", - name="Commit") - - # Add execution markers - y = [mark["y"] - for mark in self.marks if mark["event"] == CommandState.EXECUTING] - x = [mark["x"] - for mark in self.marks if mark["event"] == CommandState.EXECUTING] - fig1.add_scatter(y=y, x=x, - marker_symbol="circle", - marker=dict(color='Black', size=16), - mode="markers", - name="Normal exec start") - - # Add sandbox execution markers - y = [mark["y"] for mark in self.marks if mark["event"] - == CommandState.EXECUTING_SANDBOXED] - x = [mark["x"] for mark in self.marks if mark["event"] - == CommandState.EXECUTING_SANDBOXED] - fig1.add_scatter(y=y, x=x, - marker_symbol="circle", - marker=dict(color='Red', size=16), - mode="markers", - name="Sandbox exec start") - - if self.bash_timestamp is not None: - fig1.update_layout(shapes=[ - dict( - type='line', - yref='paper', y0=0, y1=1, - xref='x', x0=self.bash_timestamp, x1=self.bash_timestamp - ) - ]) - fig1.show() - - def add_task(self, node_id, start, end, state): - self.cmd_states.append( - dict(Command_Id=str(node_id), Start=start, Finish=end, State=state.value)) - - def handle_executing_add(self, object): - node_id = int(object.message) - self.unresolved_states[node_id] = ( - object.timestamp, CommandState.EXECUTING) - self.marks.append(dict(y=node_id, x=object.timestamp, - event=CommandState.EXECUTING)) - - def handle_executing_sandbox_add(self, object): - node_id = int(object.message) - self.unresolved_states[node_id] = ( - object.timestamp, CommandState.EXECUTING_SANDBOXED) - self.marks.append(dict(y=node_id, x=object.timestamp, - event=CommandState.EXECUTING_SANDBOXED)) - - def handle_executing_remove(self, object): - node_id = int(object.message) - assert node_id in self.unresolved_states - start, state = self.unresolved_states.pop(node_id) - end = object.timestamp - self.add_task(node_id, start, end, state) - self.add_task(node_id, end, end, state) - - def handle_frontier_add(self, object): - node_id = int(object.message) - self.unresolved_states[node_id] = ( - object.timestamp, CommandState.FRONTIER) - - def handle_frontier_remove(self, object): - pass - - def handle_stopped_add(self, object): - node_str, reason = object.message.split(":") - node_id = int(node_str) - if reason == "error": - self.unresolved_states[node_id] = ( - object.timestamp, CommandState.STOPPED_ERROR) - elif reason == "network": - self.unresolved_states[node_id] = ( - object.timestamp, CommandState.STOPPED_NETWORK) - else: - assert False - - def handle_stopped_remove(self, object): - node_id = int(object.message) - assert node_id in self.unresolved_states - start, state = self.unresolved_states.pop(node_id) - end = object.timestamp - self.add_task(node_id, start, end, state) - - def handle_waiting_add(self, object): - node_id = int(object.message) - self.unresolved_states[node_id] = ( - object.timestamp, CommandState.WAITING) - - def handle_waiting_remove(self, object): - node_id = int(object.message) - assert node_id in self.unresolved_states - start, state = self.unresolved_states.pop(node_id) - end = object.timestamp - self.add_task(node_id, start, end, state) - - def handle_commit(self, object): - nodes = [str(node) for node in object.message.split(",")] - for node in nodes: - self.marks.append(dict(y=node, x=object.timestamp, event="Commit")) - - def handle_bash(self, object): - self.bash_timestamp = datetime.strptime(object.message, "%M:%S.%f") - print(datetime.strptime(object.message, "%M:%S.%f")) - pass - - -def adjust_timestamp(state_set: SchedulingStateSet, trace_object): - t = state_set.start_timestamp - print(datetime(1900, 1, 1, 0, 0, 0) + (datetime.combine(date.min, - trace_object.timestamp) - datetime.combine(date.min, t))) - trace_object.timestamp = datetime(1900, 1, 1, 0, 0, 0) + (datetime.combine( - date.min, trace_object.timestamp) - datetime.combine(date.min, t)) - - -def parse_trace_objects(trace_file): - with open(trace_file) as logfile: - lines = logfile.read().split("\n") - lines = [tuple(line.split("|")[1:]) - for line in lines if line.startswith("TRACE|")] - trace_objects = [PashSpecTraceObject(parser.parse( - timestamp), action, message) for timestamp, action, message in lines] - return trace_objects - - -def main(): - states = SchedulingStateSet() - trace_file = os.path.join(os.path.abspath(sys.argv[1])) - lines = parse_trace_objects(trace_file) - for object in lines: - action = object.action - if action == "Nodes": - states.handle_node(object) - elif action == "ExecutingAdd": - adjust_timestamp(states, object) - states.handle_executing_add(object) - elif action == "ExecutingSandboxAdd": - adjust_timestamp(states, object) - states.handle_executing_sandbox_add(object) - if action == "ExecutingRemove": - adjust_timestamp(states, object) - states.handle_executing_remove(object) - elif action == "StoppedAdd": - adjust_timestamp(states, object) - states.handle_stopped_add(object) - elif action == "StoppedRemove": - adjust_timestamp(states, object) - states.handle_stopped_remove(object) - elif action == "WaitingAdd": - adjust_timestamp(states, object) - states.handle_waiting_add(object) - elif action == "WaitingRemove": - adjust_timestamp(states, object) - states.handle_waiting_remove(object) - elif action == "Commit": - adjust_timestamp(states, object) - states.handle_commit(object) - elif action == "Bash": - states.handle_bash(object) - else: - assert False, f"Not implemented handle for action: {action}" - states.plot() - - -if __name__ == "__main__": - main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..958a6b27 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +matplotlib>=3.7.0 \ No newline at end of file diff --git a/scripts/install_deps_ubuntu20.sh b/scripts/install_deps_ubuntu20.sh index fd1dc5b5..d54171b0 100755 --- a/scripts/install_deps_ubuntu20.sh +++ b/scripts/install_deps_ubuntu20.sh @@ -8,6 +8,8 @@ sudo update-alternatives --install /usr/bin/cram cram /usr/bin/cram3 100 export PASH_SPEC_TOP=${PASH_SPEC_TOP:-$(git rev-parse --show-toplevel --show-superproject-working-tree)} export PASH_TOP=${PASH_TOP:-$PASH_SPEC_TOP/deps/pash} +pip3 install --user -r $PASH_SPEC_TOP/requirements.txt + ## Download submodule dependencies git submodule update --init --recursive diff --git a/test/README.md b/test/README.md new file mode 100644 index 00000000..233dbd95 --- /dev/null +++ b/test/README.md @@ -0,0 +1,86 @@ +## README for `hs` Test Suite + +### Overview +This directory contains the test suite for `hs`. The main test script is `test_orch.sh`, which automates the process of running various tests on the `hs` and `bash` to ensure consistency and correctness. + +### Directory Structure + +- **test_scripts**: Contains the individual test scripts. +- **misc**: Contains utility scripts used by the test cases. +- **output_bash**: Directory to save the output of scripts executed by `bash`. +- **output_orch**: Directory to save the output of scripts executed by `hs`. +- **results**: Stores the result status and logs for each test. +- **parse_cmd_repetitions.py**: Python script to parse command repetitions from the `hs` logs. + +### Main Test Script (`test_orch.sh`) + +The main test script `test_orch.sh` starts by setting up environment variables and directories. It then proceeds to define utility functions: + +- `cleanup()`: Removes cache and clears output directories. +- `test_repetitions()`: Validates the repetition of commands using `parse_cmd_repetitions.py`. +- `run_test()`: Executes a given test for both `bash` and `hs` and compares the outputs. +- Various test functions, e.g., `test_single_command()`, `test_local_vars_1()`, etc. + +Finally, it runs the set of defined tests, provides a summary of the results, and outputs logs for both passed and failed tests. + +### Running Tests + +To run all tests: +``` +./test_orch.sh +``` + +To run specific tests: +``` +./test_orch.sh [testname] +``` + +Before running your scripts, you can set the DEBUG environment variable to provide detailed logging information. Assign a value of 2 to DEBUG to get the most detailed logs. + +```bash +export DEBUG=2 +``` + +Since the logs are printed to stderr, you can redirect them to a file to facilitate easier analysis: + +```bash +./test_orch.sh [test_name] 2>logs.txt +``` + +### Test Results + +At the end of execution, a summary is presented: + +1. List of tests that produced identical outputs on both `bash` and `hs`. +2. List of tests that produced non-identical outputs. +3. Overall summary indicating the number of tests passed. + +The detailed logs for passed and failed tests can be found in the `results` directory. + +### Adding More Tests + +If you would like to expand the test suite by adding more tests, follow these guidelines: + +1. **Create Test Script**: Write a new Bash script that performs the desired test. For example, if you wish to test a new functionality named `test_XXX.sh`, create a file with that name under the `test_scripts` directory.

+Utilize utility scripts from the `misc` directory, like `sleep_and_grep.sh`, to help maintain a modular design. This way, changes made to utility functions can propagate across multiple tests. + +2. **Update Main Test Suite**: + In the main test suite (shown above), add a new function named similarly to your test script. This function should prepare any required input files and run your test script. For example: + + ```bash + test_XXX() + { + local shell=$1 + # Setup input data here if required + $shell $TEST_SCRIPT_DIR/test_XXX.sh + } + ``` + +3. **Integrate into Test Runner**: + Add a call to your `run_test` function with your new function as the argument. This ensures it's part of the suite when no specific test names are given. Add this before `if [ "$#" -eq 0 ]; then`, for instance: + + ```bash + run_test test_XXX + ``` + + Optionally, you can provide expected repetition values as a second argument if required by the test.