Skip to content

Commit

Permalink
Merge 1c13db2 into d283629
Browse files Browse the repository at this point in the history
  • Loading branch information
agraubert committed May 13, 2020
2 parents d283629 + 1c13db2 commit 538db01
Show file tree
Hide file tree
Showing 16 changed files with 682 additions and 223 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ This is a list of available adapters. For more details, see [pipeline_options.md
* `Manual`: (Default) This is the primary input adapter responsible for determining the number of jobs and the inputs for each job, based on the raw inputs provided by the user.
* Inputs which have a single constant value will have the same value for all jobs
* Inputs which have a 1D list of values will have one of those values in each job. By Default, all list inputs must have the same length, and there will be one job per element. The nth job will have the nth value of each input
* Inputs which have a 2D (or deeper) list of values will flatten to 2D, then pass one of the nested lists to each job as if each were a regular value.
Other than that, the same rules apply, so the list must have the same length as all other lists along the first dimension
* There are extra configuration options which can change how inputs are combined or how lists are interpreted
* `Firecloud`/`Terra`: Choose this adapter if you are using data hosted in a FireCloud or Terra workspace.
Your inputs will be interpreted as entity expressions, similar to how FireCloud and Terra workflows interpret inputs. This adapter can also be configured to post results back to your workspace, if you choose. **Warning:** Reading from Workspace buckets is convenient, but you may encounter issues if your Slurm cluster is not logged in using your credentials
Expand Down Expand Up @@ -145,7 +147,8 @@ The overrides section should be a dictionary mapping input names, to a string de
* Strings which start with `gs://` are interpreted to be files/directories within Google Cloud Storage and will be localized to the Slurm controller
* Any file or Google Storage object which appears as an input to multiple jobs is considered `common` and will be localized once to a common directory, visible to all jobs
* If any input to any arbitrary job is a list, the contents of the list are interpreted using the same rules
* `Common`: Inputs marked as common will be considered common to all jobs and localized once, to a directory which is visible to all jobs. Inputs marked as common which cannot be interpreted as a filepath or a Google Cloud Storage object are ignored and treated as strings
* `Common`: Inputs marked as common will be considered common to all jobs and localized once, to a directory which is visible to all jobs. Inputs marked as common which cannot be interpreted as a filepath or a Google Cloud Storage object are ignored and treated as strings.
* Note: You can also use `Common` to force an input list to be handed to all jobs. If an input which was a list is marked as `Common` the list will be flattened to 1D, and all elements of the list will be passed to all jobs, as if the list were a single element
* `Stream`: Inputs marked as `Stream` will be streamed into a FIFO pipe, and the path to the pipe will be exported to the job. The `Stream` override is ignored for inputs which are not Google Cloud Storage objects, causing those inputs to be localized under default rules. Jobs which are requeued due to node failure will always restart the stream. Streams are created in a temporary directory on the local disk of the compute node
* `Delayed`: Inputs marked as `Delayed` will be downloaded by the job once it starts, instead of upfront during localization. The `Delayed` override is ignored for inputs which are not Google Cloud Storage objects, causing those inputs to be localized under default rules. Jobs which are requeued due to node failures will only re-download delayed inputs if the job failed before the download completed
* `Local`: Similar to `Delayed`. Inputs marked as `Local` will be downloaded by the job once it starts. The difference between `Delayed` and `Local` is that for `Local` files, a new disk is provisioned and mounted to the worker node and `Local` downloads are saved there. The disk is automatically sized
Expand Down
92 changes: 85 additions & 7 deletions canine/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,75 @@
from itertools import product, repeat
from functools import reduce

class _FixedArray(object):
"""
Helper to capture arrays which are marked as fixed
"""
def __init__(self, items):
self.items = items

@staticmethod
def flatten(items):
"""
Flattens the array tp the specified depth.
"Common" inputs only allowed 1D arrays
Otherwise only allowed 2D
"""
if not isinstance(items, list):
yield items
else:
for item in items:
yield from _FixedArray.flatten(item)

@property
def is_2d(self):
for item in self.items:
if not isinstance(item, list):
return False
return len(self.items) > 0

def __len__(self):
return len(self.items) if self.is_2d else 1

def __iter__(self):
if not self.is_2d:
raise ValueError("FixedArray is not 2d")
for elem in self.items:
if isinstance(elem, list):
yield _FixedArray(elem)
else:
yield elem

def __getitem__(self, n):
if self.is_2d and len(self) > n:
elem = self.items[n]
if isinstance(elem, list):
return _FixedArray(elem)
return elem
raise ValueError("FixedArray is not 2d")

def stringify(self, two_d):
if two_d and self.is_2d:
return [
[str(item) for item in _FixedArray.flatten(row)]
for row in self.items
]
return [str(item) for item in _FixedArray.flatten(self.items)]

class AbstractAdapter(abc.ABC):
"""
Base class for pipeline input adapters
"""

def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None):
def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, common_inputs: typing.Optional[typing.Set[str]] = None):
"""
Initializes the adapter.
If alias is provided, it is used to specify custom job aliases.
alias may be a list of strings (an alias for each job) or a single string
(the input variable to use as the alias)
"""
self.alias = alias
self.common_inputs = common_inputs if common_inputs is not None else set()


@abc.abstractmethod
Expand Down Expand Up @@ -44,21 +100,27 @@ def spec(self) -> typing.Dict[str, typing.Dict[str, str]]:
"""
pass

def pin_arrays(self, key, val):
pinned = _FixedArray(val)
if pinned.is_2d or key in self.common_inputs:
return pinned
return val

class ManualAdapter(AbstractAdapter):
"""
Handles manual argument formatting
Does pretty much nothing, except maybe combining arguments
"""

def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, product: bool = False):
def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, product: bool = False, common_inputs: typing.Optional[typing.Set[str]] = None):
"""
Initializes the adapter
If product is True, array arguments will be combined, instead of co-iterated.
If alias is provided, it is used to specify custom job aliases.
alias may be a list of strings (an alias for each job) or a single string
(the input variable to use as the alias)
"""
super().__init__(alias=alias)
super().__init__(alias=alias, common_inputs=common_inputs)
self.product = product
self.__spec = None
self._job_length = 0
Expand All @@ -69,22 +131,34 @@ def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing.
Returns a job input specification useable for Localization
Also sets self.spec to the same dictionary
"""

#Pin fixed arrays
inputs = {
key: self.pin_arrays(key, val)
for key,val in inputs.items()
}

keys = sorted(inputs)
input_lengths = {
key: len(val) if isinstance(val, list) else 1
# FixedArrays return actual length if they are 2d
key: len(val) if isinstance(val, list) or (isinstance(val, _FixedArray) and key not in self.common_inputs) else 1
for key, val in inputs.items()
}

#
# HACK: deal with lists of length 1
# We don't want to also unpack FixedArrays because an explicit fixed [[...]]
# should not simply become a regular-ass list or a commonized array
for key, val in inputs.items():
if isinstance(val, list) and len(val) == 1:
inputs[key] = val[0]

if self.product:
self._job_length = reduce(lambda x,y: x*y, input_lengths.values(), 1)
generator = product(
*[inputs[key] if isinstance(inputs[key], list) else (inputs[key],)
*[inputs[key] if isinstance(inputs[key], list) else (
iter(inputs[key]) if isinstance(inputs[key], _FixedArray) and inputs[key].is_2d else (inputs[key],)
)
for key in keys]
)
else:
Expand All @@ -99,12 +173,16 @@ def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing.
#
# XXX: simplify this with itertools.zip_longest() ?
generator = zip(*[
inputs[key] if isinstance(inputs[key], list) else repeat(inputs[key], self._job_length)
inputs[key] if isinstance(inputs[key], list) else (
iter(inputs[key]) if isinstance(inputs[key], _FixedArray) and key not in self.common_inputs else repeat(inputs[key], self._job_length)
)
for key in keys
])
self.__spec = {
str(i): {
key: str(val)
# Unpack fixed arrays here
# From localizer perspective, any lists are intentionally fixed lists
key: val.stringify(key not in self.common_inputs) if isinstance(val, _FixedArray) else str(val)
for key, val in zip(keys, job)
}
for i, job in enumerate(generator)
Expand Down
10 changes: 4 additions & 6 deletions canine/adapters/firecloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(
self, workspace: str, entityType: str, entityName: str,
entityExpression: typing.Optional[str] = None, write_to_workspace: bool = True,
alias: typing.Union[None, str, typing.List[str]] = None,
common_inputs: typing.Optional[typing.Set[str]] = None
):
"""
Initializes the adapter
Expand All @@ -30,7 +31,7 @@ def __init__(
alias may be a list of strings (an alias for each job) or a single string
(the input variable to use as the alias)
"""
super().__init__(alias=alias)
super().__init__(alias=alias, common_inputs=common_inputs)
self.workspace = dalmatian.WorkspaceManager(workspace)
if entityName not in self.workspace._get_entities_internal(entityType).index:
raise NameError('No such {} "{}" in workspace {}'.format(
Expand Down Expand Up @@ -66,18 +67,15 @@ def evaluate(self, etype: str, entity: str, expr: str) -> str:
elif len(results) == 0:
return None
else:
raise ValueError("Expression '{}' on {} {} returned more than one result".format(
expr,
etype,
entity
))
return results

def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing.List[typing.Any]]]) -> typing.Dict[str, typing.Dict[str, str]]:
"""
Takes raw user inputs and parses out actual inputs for each job
Returns a job input specification useable for Localization
Also sets self.spec to the same dictionary
"""
# FIXME: add fixed array handling for FC
# If constant input:
# this. or workspace. -> evaluate
# gs:// -> raw
Expand Down
4 changes: 2 additions & 2 deletions canine/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,12 @@ def _rmtree(self, path: str, pathstat: os.stat_result):
for fname in self.listdir(path):
fname = os.path.join(path, fname)
try:
fstat = self.stat(fname)
fstat = self.stat(fname, follow_symlinks=False)
except FileNotFoundError:
# Handling for broken symlinks is bad
self.remove(fname)
else:
if stat.S_ISDIR(fstat.st_mode):
if stat.S_ISDIR(fstat.st_mode) and not stat.S_ISLNK(fstat.st_mode):
self._rmtree(
fname,
fstat
Expand Down
Loading

0 comments on commit 538db01

Please sign in to comment.