Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
scripts/vars.sh
*__pycache__*
.DS_Store
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
path = deps/riker
url = https://github.com/angelhof/riker.git
branch = dyn-par-investigation
[submodule "deps/pash"]
path = deps/pash
url = https://github.com/binpash/pash.git
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@

A dynamic parallelizer that optimistically/speculatively executes everything in a script in parallel and ensures that it executes correctly by tracing it and reexecuting the parts that were erroneous.

## Installing

```sh
./scripts/install_deps_ubuntu20.sh
```

## Tests

To run the tests:
```sh
cd test
./test_orch.sh
```

### TODO Items

Expand Down
1 change: 1 addition & 0 deletions deps/pash
Submodule pash added at 185787
4 changes: 4 additions & 0 deletions parallel-orch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@
## Ensure that PASH_TMP_PREFIX is set by pa.sh
assert(not os.getenv('PASH_SPEC_TMP_PREFIX') is None)
PASH_SPEC_TMP_PREFIX = os.getenv('PASH_SPEC_TMP_PREFIX')

SOCKET_BUF_SIZE = 8192

SCHEDULER_SOCKET = os.getenv("PASH_SPEC_SCHEDULER_SOCKET")
11 changes: 10 additions & 1 deletion parallel-orch/executor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import config
import subprocess
import util

# This module executes a sequence of commands
# and traces them with Riker.
# Commands [1:N] are run inside an overlay sandbox.

def async_run_and_trace_command(command, trace_file, sandbox_mode=False):
def async_run_and_trace_command_return_trace(command, node_id, sandbox_mode=False):
trace_file = util.ptempfile()
process = async_run_and_trace_command(command, trace_file, node_id, sandbox_mode)
return process, trace_file

def async_run_and_trace_command(command, trace_file, node_id, sandbox_mode=False):
## Call Riker to execute the command
run_script = f'{config.PASH_SPEC_TOP}/parallel-orch/run_command.sh'
args = ["/bin/bash", run_script, command, trace_file]
Expand All @@ -15,7 +21,10 @@ def async_run_and_trace_command(command, trace_file, sandbox_mode=False):
else:
# print(" -- Standard mode")
args.append("standard")
args.append(str(node_id))
process = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# For debugging
# process = subprocess.Popen(args)
return process

def async_run_and_trace_command_in_sandbox(command, trace_file):
Expand Down
116 changes: 100 additions & 16 deletions parallel-orch/partial_program_order.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import logging

import executor
import trace

class Node:
Expand All @@ -10,6 +13,10 @@ def __str__(self):
# return f"ID: {self.id}\nCMD: {self.cmd}\nR: {self.read_set}\nW: {self.write_set}"
return self.cmd

def __repr__(self):
# return f"ID: {self.id}\nCMD: {self.cmd}\nR: {self.read_set}\nW: {self.write_set}"
return f'N({self.cmd})'

def get_cmd(self) -> str:
return self.cmd

Expand Down Expand Up @@ -51,8 +58,10 @@ def __init__(self, nodes, edges):
## Nodes that are in the frontier can only move to committed
self.frontier = self.get_source_nodes()
self.speculated = set()
self.rw_sets = {node_id: RWSet([], []) for node_id in self.nodes.keys()}
self.rw_sets = {node_id: None for node_id in self.nodes.keys()}
self.workset = []
## A dictionary from cmd_ids that are currently executing that contains their trace_files
self.commands_currently_executing = {}

def __str__(self):
return f"NODES: {len(self.nodes.keys())} | ADJACENCY: {self.adjacency}"
Expand All @@ -66,7 +75,11 @@ def get_source_nodes(self) -> list:

def init_workset(self):
self.workset = self.get_all_non_committed()


## Check if the partial order is done
def is_completed(self) -> bool:
return len(self.get_all_non_committed()) == 0

def get_workset(self) -> list:
return self.workset

Expand Down Expand Up @@ -166,10 +179,14 @@ def resolve_dependencies(self):
# We look at the transitive closure instead of workset because we want to also check the speculated cmds that are not in the workset
for second_cmd_id in transitive_closure:
# If no anti-dependencies exist, we proceed to check for dependencies
if second_cmd_id not in new_workset and (self.has_backward_dependency(first_cmd_id, second_cmd_id) or self.has_write_dependency(first_cmd_id, second_cmd_id) or self.has_forward_dependency(first_cmd_id, second_cmd_id)):
new_workset.append(second_cmd_id)
else:
pass
## TODO: We need to keep track of invalidations continuously, which is non trivial!
if second_cmd_id not in new_workset:
## If it is None, it means that it has not executed at all,
## so we need to add it in the workset
if self.get_rw_set(second_cmd_id) is None:
new_workset.append(second_cmd_id)
elif self.has_backward_dependency(first_cmd_id, second_cmd_id) or self.has_write_dependency(first_cmd_id, second_cmd_id) or self.has_forward_dependency(first_cmd_id, second_cmd_id):
new_workset.append(second_cmd_id)
# Set the new speculated set

old_speculated = self.speculated.copy()
Expand Down Expand Up @@ -212,17 +229,38 @@ def move_frontier_forward(self, old_speculated: set):
self.frontier = new_frontier

def get_next_non_speculated(self, start, old_speculated: set):
workset = self.get_next(start)
traversal_workset = self.get_next(start)
next_non_speculated = []
while len(workset) > 0:
node_id = workset.pop()
while len(traversal_workset) > 0:
node_id = traversal_workset.pop()
if node_id in old_speculated.union(self.speculated):
self.speculated.discard(node_id)
self.committed.add(node_id)
workset.extend(self.get_next(node_id))
traversal_workset.extend(self.get_next(node_id))
else:
next_non_speculated.append(node_id)
return next_non_speculated

## Run a command and add it to the dictionary of executing ones
def run_cmd_non_blocking(self, node_id: int):
## TODO: A command should only be run if it's in the frontier, otherwise it should be spec run
assert(self.is_frontier(node_id))
node = self.get_node(node_id)
cmd = node.get_cmd()
logging.debug(f'Running command: {node_id} {self.get_node(node_id)}')
_proc, trace_file = executor.async_run_and_trace_command_return_trace(cmd, node_id)
logging.debug(f'Read trace from: {trace_file}')
self.commands_currently_executing[node_id] = trace_file

def command_execution_completed(self, node_id: int):
trace_file = self.commands_currently_executing.pop(node_id)
trace_object = executor.read_trace(trace_file)
# print(trace_object)
read_set, write_set = trace.parse_and_gather_cmd_rw_sets(trace_object)
rw_set = RWSet(read_set, write_set)
self.update_rw_set(node_id, rw_set)
self.resolve_dependencies()


def log_rw_sets(self, logging):
logging.debug("====== |RW Sets| ======")
Expand All @@ -232,9 +270,55 @@ def log_rw_sets(self, logging):
logging.debug(f"Write: {rw_set.get_write_set()}")

def log_partial_program_order_info(self):
print(f"=" * 60)
print(f"WORKSET:{self.get_workset()}")
print(f"COMMITTED:{self.get_committed()}")
print(f"FRONTIER:{self.get_frontier()}")
print(f"SPECULATED:{self.get_speculated()}")
print(f"=" * 60)
logging.debug(f"=" * 60)
logging.debug(f"WORKSET:{self.get_workset()}")
logging.debug(f"COMMITTED:{self.get_committed()}")
logging.debug(f"FRONTIER:{self.get_frontier()}")
logging.debug(f"SPECULATED:{self.get_speculated()}")
logging.debug(f"=" * 60)

def parse_cmd_from_file(file_path: str) -> str:
with open(file_path) as f:
cmd = f.read()
return cmd

def parse_edge_line(line: str) -> "tuple[int, int]":
from_str, to_str = line.split(" -> ")
return (int(from_str), int(to_str))


def parse_partial_program_order_from_file(file_path: str) -> PartialProgramOrder:
with open(file_path) as f:
raw_lines = f.readlines()

## Filter comments and remove new lines
lines = [line.rstrip() for line in raw_lines
if not line.startswith("#")]

## The first line is the directory in which cmd_files are
cmds_directory = str(lines[0])
logging.debug(f'Cmds are stored in: {cmds_directory}')

## The last line is the number of nodes
number_of_nodes = int(lines[-1])
logging.debug(f'Number of po cmds: {number_of_nodes}')

## The rest of the lines are edge_lines
edge_lines = lines[1:-1]
logging.debug(f'Edges: {edge_lines}')

nodes = {}
for i in range(number_of_nodes):
file_path = f'{cmds_directory}/{i}'
cmd = parse_cmd_from_file(file_path)
nodes[i] = Node(i, cmd)

# print(nodes)

edges = {i : [] for i in range(number_of_nodes)}
for edge_line in edge_lines:
from_id, to_id = parse_edge_line(edge_line)
# print("Edge:", from_id, to_id)
edges[from_id].append(to_id)

return PartialProgramOrder(nodes, edges)
10 changes: 10 additions & 0 deletions parallel-orch/run_command.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
export CMD_STRING=${1?No command was given to execute}
export TRACE_FILE=${2?No trace file path given}
export EXEC_MODE=${3?No execution mode given}
export CMD_ID=${4?No command id given}

source "$PASH_TOP/compiler/orchestrator_runtime/speculative/pash_spec_init_setup.sh"

if [ "sandbox" == "$EXEC_MODE" ]; then
export sandbox_flag=1
Expand All @@ -23,4 +26,11 @@ else
"${PASH_SPEC_TOP}/parallel-orch/template_script_to_execute_in_overlay.sh"
fi

## Send a message to the scheduler socket
## Assumes "${PASH_SPEC_SCHEDULER_SOCKET}" is set and exported

## TODO: Pass the proper exit code
exit_code=0
msg="CommandExecComplete:${CMD_ID}|Exit code:${exit_code}"
daemon_response=$(pash_spec_communicate_scheduler_just_send "$msg") # Blocking step, daemon will not send response until it's safe to continue

Loading