Skip to content

Commit

Permalink
Merge pull request #656 from binpash/future
Browse files Browse the repository at this point in the history
PaSh version 0.12
  • Loading branch information
angelhof committed Mar 7, 2023
2 parents 1c16dad + c90f441 commit 7596f62
Show file tree
Hide file tree
Showing 70 changed files with 638 additions and 4,707 deletions.
21 changes: 12 additions & 9 deletions compiler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from util import *

## Global
__version__ = "0.11" # FIXME add libdash version
__version__ = "0.12" # FIXME add libdash version
GIT_TOP_CMD = [ 'git', 'rev-parse', '--show-toplevel', '--show-superproject-working-tree']
if 'PASH_TOP' in os.environ:
PASH_TOP = os.environ['PASH_TOP']
Expand Down Expand Up @@ -131,7 +131,7 @@ def add_common_arguments(parser):
help="(experimental) disable eager nodes before merging nodes",
action="store_true")
parser.add_argument("--no_daemon",
help="Run the compiler everytime we need a compilation instead of using the daemon",
help="(obsolete) does nothing -- Run the compiler everytime we need a compilation instead of using the daemon",
action="store_true",
default=False)
parser.add_argument("--parallel_pipelines",
Expand All @@ -143,13 +143,18 @@ def add_common_arguments(parser):
help="configure the batch size of r_split (default: 1MB)",
default=1000000)
parser.add_argument("--r_split",
help="does nothing -- only here for old interfaces (not used anywhere in the code)",
help="(obsolete) does nothing -- only here for old interfaces (not used anywhere in the code)",
action="store_true")
parser.add_argument("--dgsh_tee",
help="does nothing -- only here for old interfaces (not used anywhere in the code)",
help="(obsolete) does nothing -- only here for old interfaces (not used anywhere in the code)",
action="store_true")
parser.add_argument("--speculative",
help="(experimental) use the speculative execution preprocessing and runtime (NOTE: this has nothing to do with --speculation, which is actually misnamed, and should be named concurrent compilation/execution and is now obsolete)",
action="store_true",
default=False)
## This is misnamed, it should be named concurrent compilation/execution
parser.add_argument("--speculation",
help="(experimental) run the original script during compilation; if compilation succeeds, abort the original and run only the parallel (quick_abort) (Default: no_spec)",
help="(obsolete) does nothing -- run the original script during compilation; if compilation succeeds, abort the original and run only the parallel (quick_abort) (Default: no_spec)",
choices=['no_spec', 'quick_abort'],
default='no_spec')
parser.add_argument("--termination",
Expand Down Expand Up @@ -196,10 +201,10 @@ def pass_common_arguments(pash_arguments):
arguments.append(pash_arguments.log_file)
if (pash_arguments.no_eager):
arguments.append("--no_eager")
if (pash_arguments.no_daemon):
arguments.append("--no_daemon")
if (pash_arguments.distributed_exec):
arguments.append("--distributed_exec")
if (pash_arguments.speculative):
arguments.append("--speculative")
if (pash_arguments.parallel_pipelines):
arguments.append("--parallel_pipelines")
if (pash_arguments.daemon_communicates_through_unix_pipes):
Expand All @@ -210,8 +215,6 @@ def pass_common_arguments(pash_arguments):
arguments.append(str(pash_arguments.debug))
arguments.append("--termination")
arguments.append(pash_arguments.termination)
arguments.append("--speculation")
arguments.append(pash_arguments.speculation)
arguments.append("--width")
arguments.append(str(pash_arguments.width))
if(not pash_arguments.config_path == ""):
Expand Down
97 changes: 45 additions & 52 deletions compiler/orchestrator_runtime/pash_init_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,20 @@ export PASH_DEBUG_LEVEL=0
## Check flags
export pash_output_time_flag=1
export pash_execute_flag=1
export pash_speculation_flag=0 # By default there is no speculation
export pash_dry_run_compiler_flag=0
export pash_assert_compiler_success_flag=0
export pash_checking_speculation=0
export pash_checking_log_file=0
export pash_checking_debug_level=0
export pash_avoid_pash_runtime_completion_flag=0
export pash_profile_driven_flag=1
export pash_daemon=1
export pash_parallel_pipelines=0
export pash_daemon_communicates_through_unix_pipes_flag=0
export pash_speculative_flag=0
export show_version=0
export distributed_exec=0

for item in "$@"
do
if [ "$pash_checking_speculation" -eq 1 ]; then
export pash_checking_speculation=0
if [ "no_spec" == "$item" ]; then
export pash_speculation_flag=0
elif [ "quick_abort" == "$item" ]; then
## TODO: Fix how speculation interacts with dry_run, assert_compiler_success
export pash_speculation_flag=1
echo "$$: Error: Speculation quick-abort is currently unmaintained!" 1>&2
echo "Exiting..." 1>&2
exit 1
else
echo "$$: Unknown value for option --speculation" 1>&2
exit 1
fi
fi

if [ "$pash_checking_log_file" -eq 1 ]; then
export pash_checking_log_file=0
Expand All @@ -67,10 +50,6 @@ do
export pash_assert_compiler_success_flag=1
fi

if [ "--speculation" == "$item" ]; then
pash_checking_speculation=1
fi

if [ "--log_file" == "$item" ]; then
pash_checking_log_file=1
fi
Expand All @@ -87,10 +66,6 @@ do
pash_checking_debug_level=1
fi

if [ "--no_daemon" == "$item" ]; then
export pash_daemon=0
fi

if [ "--parallel_pipelines" == "$item" ]; then
export pash_parallel_pipelines=1
fi
Expand All @@ -99,6 +74,10 @@ do
export pash_daemon_communicates_through_unix_pipes_flag=1
fi

if [ "--speculative" == "$item" ]; then
export pash_speculative_flag=1
fi

if [ "--distributed_exec" == "$item" ]; then
export distributed_exec=1
fi
Expand Down Expand Up @@ -161,6 +140,8 @@ export -f pash_redir_output
export -f pash_redir_all_output
export -f pash_redir_all_output_always_execute

source "$PASH_TOP/compiler/orchestrator_runtime/pash_orch_lib.sh"


if [ "$pash_daemon_communicates_through_unix_pipes_flag" -eq 1 ]; then
pash_communicate_daemon()
Expand Down Expand Up @@ -188,10 +169,7 @@ else
pash_communicate_daemon()
{
local message=$1
pash_redir_output echo "Sending msg to daemon: $message"
daemon_response=$(echo "$message" | nc -U "$DAEMON_SOCKET")
pash_redir_output echo "Got response from daemon: $daemon_response"
echo "$daemon_response"
pash_communicate_unix_socket "compilation-server" "${DAEMON_SOCKET}" "${message}"
}

pash_communicate_daemon_just_send()
Expand All @@ -201,27 +179,13 @@ else

pash_wait_until_daemon_listening()
{
## Only wait for a limited amount of time.
## If the daemon cannot start listening in ~ 1 second,
## then it must have crashed or so.
i=0
## This is a magic number to make sure that we wait enough
maximum_retries=1000
## For some reason, `nc -z` doesn't work on livestar (it always returns error)
## and therefore we need to send something.
until echo "Daemon Start" 2> /dev/null | nc -U "$DAEMON_SOCKET" >/dev/null 2>&1 ;
do
## TODO: Can we wait for the daemon in a better way?
sleep 0.01
i=$((i+1))
if [ $i -eq $maximum_retries ]; then
echo "Error: Maximum retries: $maximum_retries exceeded when waiting for daemon to bind to socket!" 1>&2
echo "Exiting..." 1>&2
exit 1
fi
done
pash_wait_until_unix_socket_listening "compilation-server" "${DAEMON_SOCKET}"
}
fi
export -f pash_communicate_daemon
export -f pash_communicate_daemon_just_send
export -f pash_wait_until_daemon_listening


if [ "$distributed_exec" -eq 1 ]; then
pash_communicate_worker_manager()
Expand All @@ -233,6 +197,35 @@ if [ "$distributed_exec" -eq 1 ]; then
echo "$manager_response"
}
fi
export -f pash_communicate_daemon
export -f pash_communicate_daemon_just_send
export -f pash_wait_until_daemon_listening

if [ "${pash_speculative_flag}" -eq 1 ]; then
source "$RUNTIME_DIR/speculative/pash_spec_init_setup.sh"
else
## Normal PaSh mode
## Exports $daemon_pid
start_server()
{
python3 -S "$PASH_TOP/compiler/pash_compilation_server.py" "$@" &
export daemon_pid=$!
## Wait until daemon has established connection
pash_wait_until_daemon_listening
}

cleanup_server()
{
local daemon_pid=$1
## Only wait for daemon if it lives (it might be dead, rip)
if ps -p "$daemon_pid" > /dev/null
then
## Send and receive from daemon
msg="Done"
daemon_response=$(pash_communicate_daemon "$msg")
if [ "$distributed_exec" -eq 1 ]; then
# kill $worker_manager_pid
manager_response=$(pash_communicate_worker_manager "$msg")
fi
wait 2> /dev/null 1>&2
fi
}
fi

42 changes: 42 additions & 0 deletions compiler/orchestrator_runtime/pash_orch_lib.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/bin/bash

## Contains utility function that are used by multiple pash frontends

pash_wait_until_unix_socket_listening()
{
local server_name=$1
local socket=$2
## Only wait for a limited amount of time.
## If the daemon cannot start listening in ~ 1 second,
## then it must have crashed or so.
i=0
## This is a magic number to make sure that we wait enough
maximum_retries=1000
## For some reason, `nc -z` doesn't work on livestar (it always returns error)
## and therefore we need to send something.
until echo "Daemon Start" 2> /dev/null | nc -U "$socket" >/dev/null 2>&1 ;
do
## TODO: Can we wait for the daemon in a better way?
sleep 0.01
i=$((i+1))
if [ $i -eq $maximum_retries ]; then
echo "Error: Maximum retries: $maximum_retries exceeded when waiting for server: ${server_name} to bind to socket: ${socket}!" 1>&2
echo "Exiting..." 1>&2
exit 1
fi
done
}

pash_communicate_unix_socket()
{
local server_name=$1
local socket=$2
local message=$3
pash_redir_output echo "Sending msg to ${server_name}: $message"
daemon_response=$(echo "$message" | nc -U "${socket}")
pash_redir_output echo "Got response from ${server_name}: $daemon_response"
echo "$daemon_response"
}

export -f pash_wait_until_unix_socket_listening
export -f pash_communicate_unix_socket
48 changes: 48 additions & 0 deletions compiler/orchestrator_runtime/speculative/pash_spec_init_setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/bin/bash

source "$PASH_TOP/compiler/orchestrator_runtime/pash_orch_lib.sh"

pash_spec_communicate_scheduler()
{
local message=$1
pash_communicate_unix_socket "PaSh-Spec-scheduler" "${PASH_SPEC_SCHEDULER_SOCKET}" "${message}"
}

pash_spec_communicate_scheduler_just_send()
{
pash_spec_communicate_scheduler "$1"
}

pash_spec_wait_until_scheduler_listening()
{
pash_wait_until_unix_socket_listening "PaSh-Spec-scheduler" "${PASH_SPEC_SCHEDULER_SOCKET}"
}


start_server()
{
python3 -S "$PASH_SPEC_TOP/parallel-orch/scheduler_server.py" "$@" &
export daemon_pid=$!
## Wait until daemon has established connection
pash_spec_wait_until_scheduler_listening
}

cleanup_server()
{
local daemon_pid=$1
## Only wait for daemon if it lives (it might be dead, rip)
if ps -p "$daemon_pid" > /dev/null
then
## Send and receive from daemon
msg="Done"
daemon_response=$(pash_spec_communicate_scheduler "$msg")
wait 2> /dev/null 1>&2
fi
}

export -f pash_spec_communicate_scheduler
export -f pash_spec_communicate_scheduler_just_send
export -f pash_spec_wait_until_scheduler_listening
export -f start_server
export -f cleanup_server

17 changes: 17 additions & 0 deletions compiler/orchestrator_runtime/speculative/speculative_runtime.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash


## TODO: Ask the scheduler to let us know when a command has been committed and what is its exit code.
## TODO: Define the client in pash_spec_init_setup (which should be sourced by pash_init_setup)

## TODO: Then we need to extend the scheduler to also support this protocol (unix sockets only) and
## Respond when the command is actually done.

export pash_speculative_command_id=$1

echo "STUB: This would call the scheduler for command with id: ${pash_speculative_command_id}"

## TODO: Set this based on what the scheduler returns
pash_runtime_final_status=$?

## TODO: Also need to use wrap_vars maybe to `set` properly etc
13 changes: 11 additions & 2 deletions compiler/pash.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ def main():
sys.exit(return_code)

def preprocess_and_execute_asts(input_script_path, args, input_script_arguments, shell_name):
mode = ast_to_ast.TransformationType('pash')
preprocessed_shell_script = preprocess(input_script_path, args, mode)
preprocessed_shell_script = preprocess(input_script_path, args)
if(args.output_preprocessed):
log("Preprocessed script:")
log(preprocessed_shell_script)
Expand Down Expand Up @@ -157,10 +156,20 @@ def parse_args():
help="DEPRECATED: instead of expanding using the internal expansion code, expand using a bash mirror process (slow)",
action="store_true")

## Set the preprocessing mode to PaSh
parser.set_defaults(preprocess_mode='pash')

config.add_common_arguments(parser)
args = parser.parse_args()
config.set_config_globals_from_pash_args(args)

## Modify the preprocess mode and the partial order file if we are in speculative mode
if args.speculative:
log("PaSh is running in speculative mode...")
args.__dict__["preprocess_mode"] = "spec"
args.__dict__["partial_order_file"] = ptempfile()
log(" -- Its partial order file will be stored in:", args.partial_order_file)

## Initialize the log file
config.init_log_file()
if not config.config:
Expand Down

0 comments on commit 7596f62

Please sign in to comment.