Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
c0b84fb
make commands with comments at the end work, by adding \n to the end …
SleepyMug Mar 6, 2024
50295c0
use the newest hs commit
SleepyMug Mar 6, 2024
45a2075
adding function into saved environment
SleepyMug Mar 6, 2024
03fa49a
Add timing option in tests
gliargovas Mar 8, 2024
bf70b7b
Add test for timed execution
gliargovas Mar 8, 2024
c9c5070
Add timed loop test
gliargovas Mar 8, 2024
7c04157
Add test_loop time
gliargovas Mar 8, 2024
784e7f0
Treat var assignments as safe nodes
gliargovas Mar 11, 2024
3d5cbc2
Receive var assignments from pash
gliargovas Mar 11, 2024
d7308c3
Receive var assignment info from frontend
gliargovas Mar 13, 2024
b60c31b
Parse var assignments from PO file
gliargovas Mar 13, 2024
1e3fe8f
Use most recent pash version
gliargovas Mar 13, 2024
d97c0d2
Add var assignment nodes to the PO
gliargovas Mar 13, 2024
70266f8
Create var assignment nodes based on the PO file
gliargovas Mar 13, 2024
c3b5e64
Add functions to execute variable assignments
gliargovas Mar 15, 2024
1318ee5
Var assignment class fix
gliargovas Mar 15, 2024
b1247c0
Find var assignments when creating concrete nodes
gliargovas Mar 15, 2024
65f4d7f
Use assignment nodes in speculation
gliargovas Mar 20, 2024
fb01312
adding comment test and function test
SleepyMug Mar 21, 2024
942f1fd
fixing env simulation in speculation
SleepyMug Mar 21, 2024
4a2084c
adding prefix to tmp file generation; fix couple of empty file trace …
SleepyMug Mar 21, 2024
dd95beb
update pash for cd
SleepyMug Mar 21, 2024
012da04
fix cd state saving/restoring
SleepyMug Mar 21, 2024
2f7298e
update pash for env spec
SleepyMug Mar 21, 2024
ec04834
parse pash changes for for loop speculation
SleepyMug Mar 22, 2024
d94a07e
adding efficient loop speculation
SleepyMug Mar 23, 2024
a26c2d3
updating pash
SleepyMug Mar 23, 2024
3361462
remove additional prints
SleepyMug Mar 23, 2024
c8e6d17
update pash
SleepyMug Mar 23, 2024
696b6e3
remove pdb
SleepyMug Mar 23, 2024
d469603
update timed loop test
SleepyMug Mar 23, 2024
ff62ed1
fix renameat2
SleepyMug Mar 23, 2024
23a7cd7
update test break to have things after the loop
SleepyMug Mar 23, 2024
cb91379
Merge branch 'main' into loop_support
SleepyMug Mar 23, 2024
0ae7205
fix dockerfile
ezrizhu Mar 24, 2024
8d77d1b
gracefully handle empty command and unsafe command
SleepyMug Mar 24, 2024
35ae059
Merge remote-tracking branch 'origin/loop_support' into loop_support
SleepyMug Mar 24, 2024
ebff1d4
potentially fix trace parsing
SleepyMug Mar 25, 2024
f39f433
aggressively reset speculation on env conflict
SleepyMug Mar 25, 2024
80e4ddc
fix pure assignment node followed by comment causing problem
SleepyMug Mar 26, 2024
6326fe9
fix assignment script for real
SleepyMug Mar 26, 2024
5f71f88
fix parsing
SleepyMug Mar 27, 2024
4f10651
add assertion for env file
SleepyMug Mar 27, 2024
a2ca1ce
new version of try
SleepyMug Mar 28, 2024
2a1d600
remove timed test in automated test
SleepyMug Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ RUN python3 -m venv .venv
RUN source .venv/bin/activate
ENV PASH_SPEC_TOP=/srv/hs
ENV PASH_TOP=/srv/hs/deps/pash
RUN git submodule update --init --recursive --remote
RUN git submodule update --init --recursive
WORKDIR /srv/hs/deps/try
RUN ./setup.sh
WORKDIR /srv/hs/deps/pash
Expand Down
2 changes: 1 addition & 1 deletion deps/pash
2 changes: 1 addition & 1 deletion deps/try
Submodule try updated 1 files
+5 −0 try
7 changes: 7 additions & 0 deletions parallel-orch/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ def is_node_safe(node: CommandNode, variables: dict) -> str:
## a command substitution or a primitive.
## If so, then we need to tell the original script to execute the command.

## We are dealing with a var assignment
## Currently if treated as unsafe, it causes test_if to fail,
## so, for now, we treat them as safe.
## This adds some overhead because we create an overlay for each assignment.
if (len(node.arguments) == 0):
return True

## Expand the command argument
cmd_arg = node.arguments[0]
exp_state = expand.ExpansionState(variables)
Expand Down
16 changes: 12 additions & 4 deletions parallel-orch/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@
# and traces them with Riker.
# All commands are run inside an overlay sandbox.

def run_assignment_and_return_env_file(assignment: str, pre_execution_env_file: str):
post_execution_env_file = util.ptempfile(prefix='hs_assignment_post_env')
logging.debug(f'Running assignment: {assignment} | pre_execution_env_file: {pre_execution_env_file} | post_execution_env_file: {post_execution_env_file}')
run_script = f'{config.PASH_SPEC_TOP}/parallel-orch/run_assignment.sh'
args = ["/bin/bash", run_script, assignment, pre_execution_env_file, post_execution_env_file]
process = subprocess.run(args, stderr=subprocess.PIPE, text=True)
return post_execution_env_file

def async_run_and_trace_command_return_trace(command, concrete_node_id, execution_id, pre_execution_env_file, speculate_mode=False):
trace_file = util.ptempfile()
stdout_file = util.ptempfile()
stderr_file = util.ptempfile()
post_execution_env_file = util.ptempfile()
trace_file = util.ptempfile(prefix='hs_trace')
stdout_file = util.ptempfile(prefix='hs_stdout')
stderr_file = util.ptempfile(prefix='hs_stderr')
post_execution_env_file = util.ptempfile(prefix='hs_post_env')
sandbox_dir, tmp_dir = util.create_sandbox()
logging.debug(f'Scheduler: Stdout file for: {concrete_node_id} is: {stdout_file}')
logging.debug(f'Scheduler: Stderr file for: {concrete_node_id} is: {stderr_file}')
Expand Down
187 changes: 163 additions & 24 deletions parallel-orch/node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from itertools import chain
from enum import Enum, auto
import logging
import re
Expand Down Expand Up @@ -133,19 +132,93 @@ def __repr__(self):
def __eq__(self, other):
return self.loops == other.loops


class HSLoopListContext:
def __init__(self, loop_list_context=None):
if loop_list_context is None:
loop_list_context = []
self.loop_list_context = loop_list_context

def push(self, loop_list):
loop_list_context = self.loop_list_context[:]
loop_list_context.append(loop_list)
return HSLoopListContext(loop_list_context)

def get_ith(self, i):
pass

def get_top(self):
return self.loop_list_context[-1][:]

def pop(self):
loop_list_context = self.loop_list_context[:]
loop_list_context.pop()
return HSLoopListContext(loop_list_context)

def get_loop_list_from_env(env):
with open(env) as f:
d = util.parse_env_string_to_dict(f.read())
new_loop_list = d['HS_LOOP_LIST'].split()
return new_loop_list

@dataclass
class Node:
id_: NodeId
cmd: str
asts: "list[AstNode]"
basic_block_id: int
assignment: bool
loop_list_change: bool

def __init__(self, id_, cmd, asts, basic_block_id):
def __init__(self, id_, cmd, asts, basic_block_id, var_assignment, loop_list_change):
self.id_ = id_
self.cmd = cmd
self.asts = asts
self.basic_block_id = basic_block_id

self.assignment = var_assignment
self.loop_list_change = loop_list_change

def is_assignment(self):
return self.assignment

def is_loop_list_change(self):
return self.loop_list_change

def is_loop_list_push(self):
return self.loop_list_change and self.cmd.startswith('HS_LOOP_LIST=')

def is_loop_list_pop(self):
return self.loop_list_change and self.cmd.startswith('unset')

def pretty_format(self):
v = 'q' if self.assignment else ''
l = 'l' if self.loop_list_change else ''
return self.cmd.strip() + f' --- {v}{l} {self.id_}@'

def simulate_env(self, env):
return executor.run_assignment_and_return_env_file(self.cmd, env)

def simulate_loop_list(self, env, loop_list_context: 'HSLoopListContext'):
assert self.loop_list_change
if self.cmd == 'unset HS_LOOP_LIST':
return loop_list_context.pop()
else:
new_env = self.simulate_env(env)
new_loop_list = get_loop_list_from_env(new_env)
return loop_list_context.push(new_loop_list)

def loop_iters_do_action(loop_iters, edge_type: 'CFGEdgeType'):
loop_iters_list = list(loop_iters)
if edge_type == CFGEdgeType.LOOP_BACK:
loop_iters_list[0] += 1
elif edge_type == CFGEdgeType.LOOP_SKIP:
loop_iters_list.pop(0)
elif edge_type == CFGEdgeType.LOOP_BEGIN:
loop_iters_list.insert(0, 1)
elif edge_type == CFGEdgeType.LOOP_END:
loop_iters_list.pop(0)
return loop_iters_list

class ConcreteNodeId:
def __init__(self, node_id: NodeId, loop_iters = list()):
self.node_id = node_id
Expand Down Expand Up @@ -197,10 +270,24 @@ class ConcreteNode:
# This can only be set while in the frontier and the background node execution is enabled
# TODO: For now ignore this. Maybe there is a better way to do this.
# background_sandbox: Sandbox

# Exists when the node is in EXE or SPEC_EXE or after those states
exec_ctxt: ExecCtxt

# Exists when the node is in COMMITED or SPEC_F
exec_result: ExecResult

def __init__(self, cnid: ConcreteNodeId, node: Node):
# Updated when the node is loop changing and the node is transitioning
# into COMMITTED or SPEC_F
loop_list_context: HSLoopListContext

spec_pre_env: str

# Exists when node is in READY
assignments: "list[NodeId]"

def __init__(self, cnid: ConcreteNodeId, node: Node, loop_list_context: HSLoopListContext,
spec_pre_env=None):
self.cnid = cnid
self.abstract_node = node
self.state = NodeState.INIT
Expand All @@ -210,6 +297,8 @@ def __init__(self, cnid: ConcreteNodeId, node: Node):
self.to_be_resolved_snapshot = None
self.exec_ctxt = None
self.exec_id = None
self.spec_pre_env = spec_pre_env
self.loop_list_context = loop_list_context

def __str__(self):
return f'Node(id:{self.id_}, cmd:{self.cmd}, state:{self.state}, rwset:{self.rwset}, to_be_resolved_snapshot:{self.to_be_resolved_snapshot}, wait_env_file:{self.wait_env_file}, exec_ctxt:{self.exec_ctxt})'
Expand Down Expand Up @@ -270,17 +359,27 @@ def execution_outcome(self) -> Tuple[int, str, str]:
return self.exec_result.exit_code, self.exec_ctxt.post_env_file, self.exec_ctxt.stdout

def command_unsafe(self):
if len(self.asts) == 0:
return True
return not analysis.safe_to_execute(self.asts, {})


def update_loop_list_context(self):
if self.abstract_node.is_loop_list_change():
real_env_path = util.sandboxed_path(self.exec_ctxt.sandbox_dir,
self.exec_ctxt.post_env_file)
new_loop_list = get_loop_list_from_env(real_env_path)
self.loop_list_context = self.loop_list_context.push(new_loop_list)

## ##
## Transition Functions ##
## ##

def transition_from_init_to_ready(self):
def transition_from_init_to_ready(self, spec_pre_env):
assert self.state == NodeState.INIT
self.state = NodeState.READY
self.rwset = RWSet(set(), set())
self.spec_pre_env = spec_pre_env
# self.spec_pre_env = ConcreteAssignmentNode.execute_assignments_and_get_most_recent_spec_pre_env(assignments)
# Also, probably unroll here?

def transition_from_ready_to_unsafe(self):
Expand All @@ -296,8 +395,8 @@ def try_reset_to_ready(self):
return
else:
self.reset_to_ready()
def reset_to_ready(self):

def reset_to_ready(self, spec_pre_env: str = None):
assert self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING,
NodeState.SPECULATED]

Expand All @@ -318,15 +417,17 @@ def reset_to_ready(self):

self.exec_ctxt = None
self.exec_result = None
if spec_pre_env is not None:
self.spec_pre_env = spec_pre_env
self.state = NodeState.READY


def start_executing(self, env_file):
assert self.state == NodeState.READY
self.start_command(env_file)
self.state = NodeState.EXECUTING

def start_spec_executing(self, env_file):
# raise NotImplementedError
assert self.state == NodeState.READY
self.start_command(env_file, speculate=True)
self.state = NodeState.SPEC_EXECUTING
Expand All @@ -336,17 +437,18 @@ def commit_frontier_execution(self):
self.exec_ctxt.process.wait()
self.exec_result = ExecResult(self.exec_ctxt.process.returncode, self.exec_ctxt.process.pid)
self.gather_fs_actions()
self.update_loop_list_context()
executor.commit_workspace(self.exec_ctxt.sandbox_dir)
self.state = NodeState.COMMITTED

def finish_spec_execution(self):
assert self.state == NodeState.SPEC_EXECUTING
self.exec_ctxt.process.wait()
self.exec_result = ExecResult(self.exec_ctxt.process.returncode, self.exec_ctxt.process.pid)
self.update_loop_list_context()
self.gather_fs_actions()
self.state = NodeState.SPECULATED


def commit_speculated(self):
assert self.state == NodeState.SPECULATED
executor.commit_workspace(self.exec_ctxt.sandbox_dir)
Expand Down Expand Up @@ -406,13 +508,18 @@ def has_env_conflict_with(self, other_env) -> bool:
"CMD_ID", "STDOUT_FILE", "DIRSTACK", "SECONDS", "TMPDIR",
"UPDATED_DIRS_AND_MOUNTS", "EPOCHSECONDS", "LATEST_ENV_FILE",
"TRY_COMMAND", "SRANDOM", "speculate_flag", "EXECUTION_ID",
"EPOCHREALTIME", "OLDPWD", "exit_code",
"EPOCHREALTIME", "OLDPWD", "exit_code", "BASHPID", "BASH_COMMAND", "BASH_ARGV0",
"cmd", "BASH_ARGC", "BASH_ARGV", "BASH_SUBSHELL", "LINENO", "GROUPS", "BASH_SOURCE",
"PREVIOUS_SHELL_EC", "pash_previous_exit_status", "filter_vars_file", "pash_spec_loop_id",
"pash_loop_iters",
])


ignore_prefix = "pash_loop_"

re_scalar_string = re.compile(r'declare (?:-x|--)? (\w+)="([^"]*)"')
re_scalar_int = re.compile(r'declare -i (\w+)="(\d+)"')
re_array = re.compile(r'declare -a (\w+)=(\([^)]+\))')
re_fn = re.compile(r'declare -fx (\w+)=(\([^)]+\))')

def parse_env(content):
env_vars = {}
Expand All @@ -423,8 +530,25 @@ def parse_env(content):
match = regex.match(line)
if match:
key, value = match.groups()
if key not in ignore_vars:
if key not in ignore_vars and not key.startswith(ignore_prefix):
env_vars[key] = value
break
inside_function = False
current_function = ''
function_body_lines = []
for line in content.splitlines():
if line.startswith('#') or not line.strip():
continue
if not inside_function and not line.startswith('declare') and line.endswith('() '):
inside_function = True
current_function = line[:-len(' () ')]
elif inside_function:
function_body_lines.append(line)
if line == '}':
inside_function = False
if not current_function in ignore_vars:
env_vars[current_function] = '\n'.join(function_body_lines)
function_body_lines = []
return env_vars

with open(self.exec_ctxt.pre_env_file, 'r') as file:
Expand All @@ -434,7 +558,7 @@ def parse_env(content):
other_env_vars = parse_env(file.read())

logging.debug(f"Comparing env files {self.exec_ctxt.pre_env_file} and {other_env}")

conflict_exists = False
for key in set(node_env_vars.keys()).union(other_env_vars.keys()):
if key not in node_env_vars:
Expand All @@ -459,14 +583,14 @@ class CFGEdgeType(Enum):
LOOP_BEGIN = auto()
LOOP_END = auto()
OTHER = auto()

class HSBasicBlock:
def __init__(self, bb_id: int, nodes: list[Node]):
self.bb_id = bb_id
self.nodes = nodes

def __str__(self):
return ''.join([node.cmd.strip() + '\n' for node in self.nodes])
return ''.join([node.pretty_format() + '\n' for node in self.nodes])

@property
def loop_context(self):
Expand All @@ -491,8 +615,8 @@ def __init__(self, basic_blocks: list, block_edges: list[tuple]):
for bb_id in range(len(basic_blocks)):
self.block_adjacency[bb_id] = {}

for from_bb, to_bb, edge_type in block_edges:
self.block_adjacency[from_bb][to_bb] = CFGEdgeType[edge_type]
for from_bb, to_bb, edge_type, aux_info in block_edges:
self.block_adjacency[from_bb][to_bb] = (CFGEdgeType[edge_type], aux_info)

def is_start_of_block(self, node_id: NodeId):
for bb in self.basic_blocks:
Expand All @@ -519,17 +643,31 @@ def is_last_block(self, bb: HSBasicBlock):
else:
return False

def guess_next_block(self, bb: HSBasicBlock):
def guess_next_block(self, bb: HSBasicBlock, loop_iters: list,
loop_list_context: HSLoopListContext):
bb_id = self.basic_blocks.index(bb)
pick_dict = {}
for next_bb_id, edge_type in self.block_adjacency[bb_id].items():
pick_dict[edge_type] = next_bb_id
for edge_type in [CFGEdgeType.LOOP_END, CFGEdgeType.LOOP_TAKEN, CFGEdgeType.LOOP_SKIP,
CFGEdgeType.LOOP_BACK, CFGEdgeType.LOOP_BEGIN,
for next_bb_id, (edge_type, aux_info) in self.block_adjacency[bb_id].items():
pick_dict[edge_type] = (next_bb_id, aux_info)
if CFGEdgeType.LOOP_BEGIN in pick_dict:
assert len(pick_dict) == 1
return (CFGEdgeType.LOOP_BEGIN, self.basic_blocks[pick_dict[edge_type][0]],
pick_dict[edge_type][1])
elif CFGEdgeType.LOOP_SKIP in pick_dict:
assert CFGEdgeType.LOOP_TAKEN in pick_dict
if len(loop_list_context.get_top()) < loop_iters[0]:
return (CFGEdgeType.LOOP_SKIP,
self.basic_blocks[pick_dict[CFGEdgeType.LOOP_SKIP][0]],
pick_dict[edge_type][1])
else:
return (CFGEdgeType.LOOP_TAKEN,
self.basic_blocks[pick_dict[CFGEdgeType.LOOP_TAKEN][0]],
pick_dict[edge_type][1])
for edge_type in [CFGEdgeType.LOOP_END, CFGEdgeType.LOOP_BACK,
CFGEdgeType.IF_TAKEN, CFGEdgeType.ELSE_TAKEN,
CFGEdgeType.OTHER]:
if edge_type in pick_dict:
return edge_type, self.basic_blocks[pick_dict[edge_type]]
return edge_type, self.basic_blocks[pick_dict[edge_type][0]], pick_dict[edge_type][1]
assert False

def find_node(self, node_id):
Expand All @@ -542,3 +680,4 @@ def find_node(self, node_id):
def __str__(self):
return 'prog:\n' + '\n'.join(
[f'block {i}:\n' + str(bb) + f'goto block {self.block_adjacency[i]}\n' for i, bb in enumerate(self.basic_blocks)])

Loading