Skip to content

Commit

Permalink
Merge pull request #562 from binpash/future
Browse files Browse the repository at this point in the history
Many fixes and OSDI artifact
  • Loading branch information
angelhof committed May 27, 2022
2 parents 5ff19c4 + 99641e5 commit 5eac7b9
Show file tree
Hide file tree
Showing 112 changed files with 5,285 additions and 375 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/tests.yaml
Expand Up @@ -74,3 +74,33 @@ jobs:
run: |
# check if everything executed without errors
cd scripts/workflow && bash exit_code.sh
shellcheck:
runs-on: ubuntu-latest
if: github.event.pull_request.draft == false
steps:
- uses: actions/checkout@v3

- uses: ludeeus/action-shellcheck@master
env:
# Only check some field splitting problems now, but we should check
# others in the future.
SHELLCHECK_OPTS:
-i SC2046
-i SC2048
-i SC2053
-i SC2068
-i SC2086
-i SC2206
-i SC2254
with:
ignore_paths:
annotations
compiler/parser/libdash
compiler/tests
evaluation
python_pkgs
runtime/agg/cpp/tests
scripts
ignore_names:
test_evaluation_scripts.sh
9 changes: 5 additions & 4 deletions README.md
Expand Up @@ -3,10 +3,11 @@
> _A system for parallelizing POSIX shell scripts._
> _Hosted by the [Linux Foundation](https://linuxfoundation.org/press-release/linux-foundation-to-host-the-pash-project-accelerating-shell-scripting-with-automated-parallelization-for-industrial-use-cases/)._
| Service | Master | Develop |

| Service | Main | Develop |
| :--- | :----: | :----: |
| Tests | [![Tests](https://github.com/binpash/pash/actions/workflows/tests.yaml/badge.svg?branch=main&event=push)](https://github.com/binpash/pash/actions/workflows/tests.yaml) | [![Tests](https://github.com/binpash/pash/actions/workflows/tests.yaml/badge.svg?branch=future&event=push)](https://github.com/binpash/pash/actions/workflows/tests.yaml) |
| Build | [![Build](https://github.com/binpash/pash/actions/workflows/build.yaml/badge.svg?branch=main&event=push)](https://github.com/binpash/pash/actions/workflows/build.yaml) | [![Build](https://github.com/binpash/pash/actions/workflows/build.yaml/badge.svg?branch=future&event=push)](https://github.com/binpash/pash/actions/workflows/build.yaml) |
| Tests | [![Tests](https://github.com/binpash/pash/actions/workflows/tests.yaml/badge.svg?branch=main&event=push)](https://github.com/binpash/pash/actions/workflows/tests.yaml?query=branch%3Amain++) | [![Tests](https://github.com/binpash/pash/actions/workflows/tests.yaml/badge.svg?branch=future&event=push)](https://github.com/binpash/pash/actions/workflows/tests.yaml?query=branch%3Afuture++) |
| Build | [![Build](https://github.com/binpash/pash/actions/workflows/build.yaml/badge.svg?branch=main&event=push)](https://github.com/binpash/pash/actions/workflows/build.yaml?query=branch%3Amain++) | [![Build](https://github.com/binpash/pash/actions/workflows/build.yaml/badge.svg?branch=future&event=push)](https://github.com/binpash/pash/actions/workflows/build.yaml?query=branch%3Afuture++) |
| Pages | [![DeployPublish](https://github.com/binpash/web/actions/workflows/pages.yaml/badge.svg)](https://github.com/binpash/web/actions/workflows/pages.yaml) | [![DeployPublish](https://github.com/binpash/web/actions/workflows/pages.yaml/badge.svg)](https://github.com/binpash/web/actions/workflows/pages.yaml) |


Expand Down Expand Up @@ -45,7 +46,7 @@ This repo hosts the core `pash` development. The structure is as follows:
* [compiler](./compiler): Shell-dataflow translations and associated parallelization transformations.
* [docs](./docs): Design documents, tutorials, installation instructions, etc.
* [evaluation](./evaluation): Shell pipelines and example [scripts](./evaluation/other/more-scripts) used for the evaluation.
* [runtime](./runtime): Runtime component — e.g., `eager`, `split`, and assocaited combiners.
* [runtime](./runtime): Runtime component — e.g., `eager`, `split`, and associated combiners.
* [scripts](./scripts): Scripts related to continuous integration, deployment, and testing.

## Community & More
Expand Down
7 changes: 6 additions & 1 deletion annotations/custom_sort.json
Expand Up @@ -17,7 +17,12 @@
"class": "parallelizable_pure",
"properties": ["commutative"],
"inputs": ["stdin"],
"outputs": ["stdout"]
"outputs": ["stdout"],
"aggregator":
{
"name": "sort",
"options": ["-m"]
}
}
],
"comment": "TODO: This is not correct. To assign arguments correctly, we need to check if there is any file argument. If not, then we can read from stdin, otherwise we have to read from input files."
Expand Down
12 changes: 12 additions & 0 deletions annotations/dfs_split_reader.json
@@ -0,0 +1,12 @@
{
"command": "dfs_split_reader.sh",
"cases":
[
{
"predicate": "default",
"class": "pure",
"inputs": ["args[0]"],
"outputs": ["stdout"]
}
]
}
23 changes: 23 additions & 0 deletions annotations/hdfs.json
@@ -0,0 +1,23 @@
{
"command": "hdfs",
"cases":
[
{
"predicate":
{
"operator": "exists",
"operands": ["-cat"]
},
"class": "stateless",
"inputs": ["args[1]"],
"outputs": ["stdout"],
"comments": "This represents hdfs dfs -cat <path>. Slightly hacky since we only check for -cat"
},
{
"predicate": "default",
"class": "side-effectful",
"inputs": ["stdin"],
"outputs": ["stdout"]
}
]
}
12 changes: 12 additions & 0 deletions annotations/remote_read.json
@@ -0,0 +1,12 @@
{
"command": "remote_read.sh",
"cases":
[
{
"predicate": "default",
"class": "pure",
"inputs": [],
"outputs": ["stdout"]
}
]
}
12 changes: 12 additions & 0 deletions annotations/remote_write.json
@@ -0,0 +1,12 @@
{
"command": "remote_write.sh",
"cases":
[
{
"predicate": "default",
"class": "pure",
"inputs": ["stdin"],
"outputs": ["stdout"]
}
]
}
2 changes: 1 addition & 1 deletion annotations/seq.json
Expand Up @@ -5,7 +5,7 @@
{
"predicate": "default",
"class": "pure",
"inputs": ["stdin"],
"inputs": [],
"outputs": ["stdout"]
}
]
Expand Down
9 changes: 9 additions & 0 deletions compiler/ast_to_ir.py
Expand Up @@ -208,6 +208,15 @@ def compile_node_command(ast_node, fileIdGen, config):

## If there are no arguments, the command is just an
## assignment
##
## TODO: The if-branch of this conditional should never be possible since the preprocessor
## wouldn't replace a call without arguments (simple assignment).
##
## Also the return is not in the correct indentation so probably it never gets called
## in our tests.
##
## We should remove it and add the following assert:
## assert len(ast_node.arguments) > 0
if(len(ast_node.arguments) == 0):
## Just compile the assignments. Specifically compile the
## assigned values, because they might have command
Expand Down
3 changes: 3 additions & 0 deletions compiler/config.json
Expand Up @@ -7,6 +7,9 @@
"r_wrap_binary": "runtime/r_wrap",
"r_unwrap_binary": "runtime/r_unwrap",
"dgsh_tee_binary": "runtime/dgsh_tee.sh",
"remote_read_binary": "runtime/dspash/remote_read.sh",
"remote_write_binary": "runtime/dspash/remote_write.sh",
"dfs_split_reader_binary": "runtime/dspash/dfs_split_reader.sh",
"clean_up_graph_binary": "runtime/wait_for_output_and_sigpipe_rest.sh",
"redirect_stdin_binary": "runtime/redirect_stdin_to.sh",
"immediate": "./.pash_immediate_command.sh",
Expand Down
10 changes: 9 additions & 1 deletion compiler/config.py
Expand Up @@ -10,7 +10,7 @@
from util import *

## Global
__version__ = "0.7.2" # FIXME add libdash version
__version__ = "0.8" # FIXME add libdash version
GIT_TOP_CMD = [ 'git', 'rev-parse', '--show-toplevel', '--show-superproject-working-tree']
if 'PASH_TOP' in os.environ:
PASH_TOP = os.environ['PASH_TOP']
Expand All @@ -27,6 +27,8 @@

LOGGING_PREFIX = ""

HDFS_PREFIX = "$HDFS_DATANODE_DIR/"

config = {}
annotations = []
pash_args = None
Expand Down Expand Up @@ -146,6 +148,10 @@ def add_common_arguments(parser):
parser.add_argument("--daemon_communicates_through_unix_pipes",
help="(experimental) the daemon communicates through unix pipes instead of sockets",
action="store_true")
parser.add_argument("--distributed_exec",
help="(experimental) execute the script in a distributed environment. Remote machines should be configured and ready",
action="store_true",
default=False)
parser.add_argument("--config_path",
help="determines the config file path. By default it is 'PASH_TOP/compiler/config.yaml'.",
default="")
Expand Down Expand Up @@ -187,6 +193,8 @@ def pass_common_arguments(pash_arguments):
arguments.append(string_to_argument("--dgsh_tee"))
if (pash_arguments.no_daemon):
arguments.append(string_to_argument("--no_daemon"))
if (pash_arguments.distributed_exec):
arguments.append(string_to_argument("--distributed_exec"))
if (pash_arguments.parallel_pipelines):
arguments.append(string_to_argument("--parallel_pipelines"))
if (pash_arguments.daemon_communicates_through_unix_pipes):
Expand Down
2 changes: 1 addition & 1 deletion compiler/definitions/ir/arg.py
Expand Up @@ -28,4 +28,4 @@ def to_ast(self):
def concatenate(self, other):
space = [['C', 32]]
self.arg_char_list.extend(space)
self.arg_char_list.extend(other.arg_char_list)
self.arg_char_list.extend(other.arg_char_list)
31 changes: 29 additions & 2 deletions compiler/definitions/ir/file_id.py
Expand Up @@ -39,15 +39,22 @@ def __repr__(self):
return output

def serialize(self):
if(isinstance(self.resource, EphemeralResource)):
if(isinstance(self.resource, TemporaryFileResource)):
output = self.get_temporary_file_suffix()
elif(isinstance(self.resource, EphemeralResource)):
output = self.get_fifo_suffix()
else:
output = "{}".format(self.resource)
return output

def get_temporary_file_suffix(self):
tempfile_name = "{}{}".format(self.prefix, self.ident)
return tempfile_name

def get_fifo_suffix(self):
fifo_name = "{}#fifo{}".format(self.prefix, self.ident)
return fifo_name

## Serialize as an option for the JSON serialization when sent to
## the backend. This is needed as options can either be files or
## arguments, and in each case there needs to be a different
Expand All @@ -66,7 +73,11 @@ def to_ast(self, stdin_dash=False):
## check if a file id refers to a pipe
##
## TODO: I am not sure about the FileDescriptor resource
if(isinstance(self.resource, EphemeralResource)):
if(isinstance(self.resource, TemporaryFileResource)):
suffix = self.get_temporary_file_suffix()
string = os.path.join(config.PASH_TMP_PREFIX, suffix)
argument = string_to_argument(string)
elif(isinstance(self.resource, EphemeralResource)):
suffix = self.get_fifo_suffix()
string = os.path.join(config.PASH_TMP_PREFIX, suffix)
## Quote the argument
Expand Down Expand Up @@ -102,9 +113,15 @@ def has_file_resource(self):
def has_file_descriptor_resource(self):
return (isinstance(self.resource, FileDescriptorResource))

def has_remote_file_resource(self):
return isinstance(self.resource, RemoteFileResource)

def is_ephemeral(self):
return (isinstance(self.resource, EphemeralResource))

def make_temporary_file(self):
self.resource = TemporaryFileResource()

## Removes a resource from an FID, making it ephemeral
def make_ephemeral(self):
self.resource = EphemeralResource()
Expand All @@ -118,3 +135,13 @@ def isNull(self):

def get_ident(self):
return self.ident

def is_available_on(self, host):
if self.is_ephemeral():
return True
elif self.has_remote_file_resource():
return self.resource.is_available_on(host)
else:
# Currently any other resource types should
# be part of the main shell graph.
return False
28 changes: 28 additions & 0 deletions compiler/definitions/ir/nodes/dfs_split_reader.py
@@ -0,0 +1,28 @@
import os
from definitions.ir.dfg_node import *

class DFSSplitReader(DFGNode):
def __init__(self, inputs, outputs, com_name, com_category,
com_options = [], com_redirs = [], com_assignments=[]):

super().__init__(inputs, outputs, com_name, com_category,
com_options=com_options,
com_redirs=com_redirs,
com_assignments=com_assignments)

def set_server_address(self, addr): # ex addr: 127.0.0.1:50051
self.com_options.append((3, Arg(string_to_argument(f"--addr {addr}"))))

def make_dfs_split_reader_node(inputs, output, split_num, prefix):
split_reader_bin = os.path.join(config.PASH_TOP, config.config['runtime']['dfs_split_reader_binary'])
com_name = Arg(string_to_argument(split_reader_bin))
com_category = "pure"
options = []
options.append((1, Arg(string_to_argument(f"--prefix '{prefix}'"))))
options.append((2, Arg(string_to_argument(f"--split {split_num}"))))

return DFSSplitReader(inputs,
[output],
com_name,
com_category,
options)
11 changes: 11 additions & 0 deletions compiler/definitions/ir/nodes/hdfs_cat.py
@@ -0,0 +1,11 @@
from definitions.ir.dfg_node import *

class HDFSCat(DFGNode):
def __init__(self, inputs, outputs, com_name, com_category,
com_options = [], com_redirs = [], com_assignments=[]):
assert(str(com_name) == "hdfs")
assert(str(com_options[0][1]) == "dfs" and str(com_options[1][1]) == "-cat")
super().__init__(inputs, outputs, com_name, com_category,
com_options=com_options,
com_redirs=com_redirs,
com_assignments=com_assignments)
30 changes: 30 additions & 0 deletions compiler/definitions/ir/nodes/remote_pipe.py
@@ -0,0 +1,30 @@
from definitions.ir.dfg_node import *

class RemotePipe(DFGNode):
def __init__(self, inputs, outputs, com_name, com_category,
com_options = [], com_redirs = [], com_assignments=[]):
super().__init__(inputs, outputs, com_name, com_category,
com_options=com_options,
com_redirs=com_redirs,
com_assignments=com_assignments)

def make_remote_pipe(inputs, outputs, host_ip, port, is_remote_read, id):
com_category = "pure"
options = []
opt_count = 0

if is_remote_read:
remote_pipe_bin = os.path.join(config.PASH_TOP, config.config['runtime']['remote_read_binary'])
else:
remote_pipe_bin = os.path.join(config.PASH_TOP, config.config['runtime']['remote_write_binary'])

com_name = Arg(string_to_argument(remote_pipe_bin))

options.append((opt_count, Arg(string_to_argument(f"--addr {host_ip}:{port}"))))
options.append((opt_count + 1, Arg(string_to_argument(f"--id {id}"))))

return RemotePipe(inputs,
outputs,
com_name,
com_category,
com_options=options)

0 comments on commit 5eac7b9

Please sign in to comment.