From 84dc87e570107d2d1b08da9a39d49e543128d652 Mon Sep 17 00:00:00 2001 From: Aaron Graubert Date: Tue, 4 Feb 2020 18:32:55 -0500 Subject: [PATCH 01/10] Working on array inputs Replicated non-array behavior on non-NFS backends --- README.md | 3 + canine/adapters/base.py | 68 +++++++++-- canine/adapters/firecloud.py | 4 +- canine/localization/base.py | 223 ++++++++++++++++++----------------- canine/localization/local.py | 2 +- pipeline_options.md | 5 + 6 files changed, 190 insertions(+), 115 deletions(-) diff --git a/README.md b/README.md index 6514404d..3e3b29d8 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,9 @@ resources: # slurm resources varname: value {--resources varname:value} adapter: # Job input adapter configuration type: [One of: Manual (default), Firecloud] The adapter to map inputs into actual job inputs {--adapter type:value} + arrays: # list of input varnames which should be interpreted as a single input array rather than a list of variable values + - varname # 1-d inputs will be interpreted as a single list which gets exported to all jobs + - varname # 2-d inputs will be interpreted as a list of arrays to export to individual jobs # Other Keyword arguments to provide to adapter # Manual Args: product: (bool, default false) Whether adapter should take the product of all inputs rather than iterating {--adapter product:value} diff --git a/canine/adapters/base.py b/canine/adapters/base.py index 0e6ae091..095f2708 100644 --- a/canine/adapters/base.py +++ b/canine/adapters/base.py @@ -3,12 +3,49 @@ from itertools import product, repeat from functools import reduce +class _FixedArray(object): + """ + Helper to capture arrays which are marked as fixed + """ + def __init__(self, items): + self.items = items + + @property + def is_2d(self): + return len(self.items) > 0 and isinstance(self.items[0], list) + + def __len__(self): + return len(self.items) if self.is_2d else 1 + + def __iter__(self): + if not self.is_2d: + raise ValueError("FixedArray is not 2d") + for elem in self.items: + if isinstance(elem, list): + yield _FixedArray(elem) + else: + yield elem + + def __getitem__(self, n): + if self.is_2d and len(self) > n: + elem = self.items[n] + if isinstance(elem, list): + return _FixedArray(elem) + return elem + raise ValueError("FixedArray is not 2d") + + def stringify(self): + return [ + [str(item) for item in elem] if self.is_2d else str(elem) + for elem in self.items + ] + class AbstractAdapter(abc.ABC): """ Base class for pipeline input adapters """ - def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None): + def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, arrays: typing.List[str] = None): """ Initializes the adapter. If alias is provided, it is used to specify custom job aliases. @@ -16,6 +53,7 @@ def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None): (the input variable to use as the alias) """ self.alias = alias + self.arrays = arrays if arrays is not None else [] @abc.abstractmethod @@ -50,7 +88,7 @@ class ManualAdapter(AbstractAdapter): Does pretty much nothing, except maybe combining arguments """ - def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, product: bool = False): + def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, product: bool = False, arrays: typing.List[str] = None): """ Initializes the adapter If product is True, array arguments will be combined, instead of co-iterated. @@ -58,7 +96,7 @@ def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, prod alias may be a list of strings (an alias for each job) or a single string (the input variable to use as the alias) """ - super().__init__(alias=alias) + super().__init__(alias=alias, arrays=arrays) self.product = product self.__spec = None self._job_length = 0 @@ -69,14 +107,24 @@ def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing. Returns a job input specification useable for Localization Also sets self.spec to the same dictionary """ + + #Pin fixed arrays + inputs = { + key: _FixedArray(val) if key in self.arrays else val + for key,val in inputs.items() + } + keys = sorted(inputs) input_lengths = { - key: len(val) if isinstance(val, list) else 1 + # FixedArrays return actual length if they are 2d + key: len(val) if isinstance(val, list) or (isinstance(val, _FixedArray) and val.is_2d) else 1 for key, val in inputs.items() } # # HACK: deal with lists of length 1 + # We don't want to also unpack FixedArrays because an explicit fixed [[...]] + # should not simply become a regular-ass list or a commonized array for key, val in inputs.items(): if isinstance(val, list) and len(val) == 1: inputs[key] = val[0] @@ -84,7 +132,9 @@ def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing. if self.product: self._job_length = reduce(lambda x,y: x*y, input_lengths.values(), 1) generator = product( - *[inputs[key] if isinstance(inputs[key], list) else (inputs[key],) + *[inputs[key] if isinstance(inputs[key], list) else ( + iter(inputs[key]) if isinstance(inputs[key], _FixedArray) and inputs[key].is_2d else (inputs[key],) + ) for key in keys] ) else: @@ -99,12 +149,16 @@ def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing. # # XXX: simplify this with itertools.zip_longest() ? generator = zip(*[ - inputs[key] if isinstance(inputs[key], list) else repeat(inputs[key], self._job_length) + inputs[key] if isinstance(inputs[key], list) else ( + iter(inputs[key]) if isinstance(inputs[key], _FixedArray) and inputs[key].is_2d else repeat(inputs[key], self._job_length) + ) for key in keys ]) self.__spec = { str(i): { - key: str(val) + # Unpack fixed arrays here + # From localizer perspective, any lists are intentionally fixed lists + key: val.stringify() if isinstance(val, _FixedArray) else str(val) for key, val in zip(keys, job) } for i, job in enumerate(generator) diff --git a/canine/adapters/firecloud.py b/canine/adapters/firecloud.py index 79c4a24c..c13914c9 100644 --- a/canine/adapters/firecloud.py +++ b/canine/adapters/firecloud.py @@ -17,6 +17,7 @@ def __init__( self, workspace: str, entityType: str, entityName: str, entityExpression: typing.Optional[str] = None, write_to_workspace: bool = True, alias: typing.Union[None, str, typing.List[str]] = None, + arrays: typing.List[str] = None ): """ Initializes the adapter @@ -30,7 +31,7 @@ def __init__( alias may be a list of strings (an alias for each job) or a single string (the input variable to use as the alias) """ - super().__init__(alias=alias) + super().__init__(alias=alias, arrays=arrays) self.workspace = dalmatian.WorkspaceManager(workspace) if entityName not in self.workspace._get_entities_internal(entityType).index: raise NameError('No such {} "{}" in workspace {}'.format( @@ -78,6 +79,7 @@ def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing. Returns a job input specification useable for Localization Also sets self.spec to the same dictionary """ + # FIXME: add fixed array handling for FC # If constant input: # this. or workspace. -> evaluate # gs:// -> raw diff --git a/canine/localization/base.py b/canine/localization/base.py index 502de07a..7e132bfe 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -19,7 +19,7 @@ from agutil import status_bar Localization = namedtuple("Localization", ['type', 'path']) -# types: stream, download, None +# types: stream, download, array, None # indicates what kind of action needs to be taken during job startup PathType = namedtuple( @@ -364,7 +364,7 @@ def delocalize(self, patterns: typing.Dict[str, str], output_dir: str = 'canine_ output_files[jobId][outputname] = [dirpath] return output_files - def pick_common_inputs(self, inputs: typing.Dict[str, typing.Dict[str, str]], overrides: typing.Dict[str, typing.Optional[str]], transport: typing.Optional[AbstractTransport] = None) -> typing.Dict[str, str]: + def pick_common_inputs(self, inputs: typing.Dict[str, typing.Dict[str, typing.Union[str, typing.List[str]]]], overrides: typing.Dict[str, typing.Optional[str]], transport: typing.Optional[AbstractTransport] = None) -> typing.Dict[str, str]: """ Scans input configuration and overrides to choose inputs which should be treated as common. Returns the dictionary of common inputs {input path: common path} @@ -373,12 +373,13 @@ def pick_common_inputs(self, inputs: typing.Dict[str, typing.Dict[str, str]], ov self.common_inputs = set() seen = set() for jobId, values in inputs.items(): - for arg, path in values.items(): - if path in seen and (arg not in overrides or overrides[arg] == 'common'): - self.common_inputs.add(path) - if arg in overrides and overrides[arg] == 'common': - self.common_inputs.add(path) - seen.add(path) + for arg, paths in values.items(): + for path in (paths if isinstance(paths, list) else (paths,)): + if path in seen and (arg not in overrides or overrides[arg] == 'common'): + self.common_inputs.add(path) + if arg in overrides and overrides[arg] == 'common': + self.common_inputs.add(path) + seen.add(path) common_dests = {} for path in self.common_inputs: if path.startswith('gs://') or os.path.exists(path): @@ -404,81 +405,106 @@ def finalize_staging_dir(self, jobs: typing.Iterable[str], transport: typing.Opt transport.mkdir(controller_env['CANINE_OUTPUT']) return transport.normpath(self.staging_dir) - def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, str], common_dests: typing.Dict[str, str], overrides: typing.Dict[str, typing.Optional[str]], transport: typing.Optional[AbstractTransport] = None): + def handle_input(self, jobId: str, input_name: str, input_value: typing.Union[str, typing.List[str]], common_dests: typing.Dict[str, str], overrides: typing.Dict[str, typing.Optional[str]], transport: AbstractTransport) -> Localization: + """ + Handles a singular input. + Localizes sraightforward files. Stream/Delay files are marked for later handling + Returns Localization objects for the determined actions still required + (Localization objects are handled during setup_teardown) + Job array inputs have their elements individually handled here + """ + if isinstance(input_value, list): + return Localization( + 'array', + [ + self.handle_input( + jobId, + input_name, + common_dests, + overrides, + transport + ) + for elem in input_value + ] + ) + mode = overrides[input_name] if input_name in overrides else False + if input_value in common_dests or mode == 'common': + # common override already handled + # No localization needed + return Localization(None, common_dests[input_value]) + elif mode is not False: # User has specified an override + if mode == 'stream': + if input_value.startswith('gs://'): + return Localization('stream', input_value) + print("Ignoring 'stream' override for", input_name, "with value", input_value, "and localizing now", file=sys.stderr) + elif mode == 'delayed': + if input_value.startswith('gs://'): + return Localization('download', input_value) + print("Ignoring 'delayed' override for", input_name, "with value", input_value, "and localizing now", file=sys.stderr) + elif mode == None: + return Localization(None, input_value) + # At this point, no overrides have taken place, so handle by default + if os.path.exists(input_value) or input_value.startswith('gs://'): + remote_path = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(input_value))) + self.localize_file(input_value, remote_path, transport=transport) + input_value = remote_path + return Localization( + None, + # value will either be a PathType, if localized above, + # or an unchanged string if not handled + input_value + ) + + + def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, typing.Union[str, typing.List[str]]], common_dests: typing.Dict[str, str], overrides: typing.Dict[str, typing.Optional[str]], transport: typing.Optional[AbstractTransport] = None): """ Prepares job-specific inputs. Fills self.inputs[jobId] with Localization objects for each input + Just a slight logical wrapper over the recursive handle_input """ if 'CANINE_JOB_ALIAS' in job_inputs and 'CANINE_JOB_ALIAS' not in overrides: overrides['CANINE_JOB_ALIAS'] = None with self.transport_context(transport) as transport: self.inputs[jobId] = {} for arg, value in job_inputs.items(): - mode = overrides[arg] if arg in overrides else False - if value in common_dests or mode == 'common': - # common override already handled - # No localization needed, already copied - self.inputs[jobId][arg] = Localization(None, common_dests[value]) - elif mode is not False: - if mode == 'stream': - if not value.startswith('gs://'): - print("Ignoring 'stream' override for", arg, "with value", value, "and localizing now", file=sys.stderr) - self.inputs[jobId][arg] = Localization( - None, - self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(value))) - ) - self.localize_file( - value, - self.inputs[jobId][arg].path, - transport=transport - ) - else: - self.inputs[jobId][arg] = Localization( - 'stream', - value - ) - elif mode == 'localize': - self.inputs[jobId][arg] = Localization( - None, - self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(value))) - ) - self.localize_file( - value, - self.inputs[jobId][arg].path, - transport=transport - ) - elif mode == 'delayed': - if not value.startswith('gs://'): - print("Ignoring 'delayed' override for", arg, "with value", value, "and localizing now", file=sys.stderr) - self.inputs[jobId][arg] = Localization( - None, - self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(value))) - ) - self.localize_file( - value, - self.inputs[jobId][arg].path, - transport=transport - ) - else: - self.inputs[jobId][arg] = Localization( - 'download', - value - ) - elif mode is None: - # Do not reserve path here - # null override treats input as string - self.inputs[jobId][arg] = Localization(None, value) - else: - if os.path.exists(value) or value.startswith('gs://'): - remote_path = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(value))) - self.localize_file(value, remote_path, transport=transport) - value = remote_path - self.inputs[jobId][arg] = Localization( - None, - # value will either be a PathType, if localized above, - # or an unchanged string if not handled - value - ) + self.inputs[jobId][arg] = self.handle_input( + jobId, + arg, + value, + common_dests, + overrides, + transport + ) + + def final_localization(self, jobId: str, request: Localization) -> typing.Tuple[typing.List[str], str]: + """ + Processes final localization commands. + Returns a list of extra tasts to add, as well as the final exportable value + """ + if request.type == 'stream': + dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(request.path))) + return [ + 'if [[ -e {0} ]]; then rm {0}; fi'.format(dest.computepath), + 'mkfifo {}'.format(dest.computepath), + "gsutil {} cat {} > {} &".format( + '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(request.path) else '', + shlex.quote(request.path), + dest.computepath + ) + ], '"{}"'.format(dest.computepath) + elif request.type == 'download': + dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(request.path))) + return [ + "if [[ ! -e {2}.fin ]]; then gsutil {0} -o GSUtil:check_hashes=if_fast_else_skip cp {1} {2} && touch {2}.fin; fi".format( + '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(request.path) else '', + shlex.quote(request.path), + dest.computepath + ) + ], '"{}"'.format(dest.computepath) + elif request.type == None: + return [], shlex.quote(request.path.computepath if isinstance(request.path, PathType) else request.path) + raise TypeError("request type '{}' not supported at this stage".format(request.type)) + def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str]: """ @@ -492,44 +518,29 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ ] compute_env = self.environment('compute') for key, val in self.inputs[jobId].items(): - if val.type == 'stream': - job_vars.append(shlex.quote(key)) - dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) - extra_tasks += [ - 'if [[ -e {0} ]]; then rm {0}; fi'.format(dest.computepath), - 'mkfifo {}'.format(dest.computepath), - "gsutil {} cat {} > {} &".format( - '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(val.path) else '', - shlex.quote(val.path), - dest.computepath - ) - ] - exports.append('export {}="{}"'.format( - key, - dest.computepath - )) - elif val.type == 'download': - job_vars.append(shlex.quote(key)) - dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) - extra_tasks += [ - "if [[ ! -e {2}.fin ]]; then gsutil {0} -o GSUtil:check_hashes=if_fast_else_skip cp {1} {2} && touch {2}.fin; fi".format( - '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(val.path) else '', - shlex.quote(val.path), - dest.computepath + job_vars.append(shlex.quote(key)) + if val.type == 'array': + # Hack: the array elements are exposed here as the Localization arg's .path attr + array_values = [] + for request in val.path: + new_tasks, exportable_value = self.final_localization( + jobId, request ) - ] + extra_tasks += new_tasks + array_values.append(exportable_value) exports.append('export {}="{}"'.format( key, - dest.computepath + ':'.join(array_values) )) - elif val.type is None: - job_vars.append(shlex.quote(key)) + else: + new_tasks, exportable_value = self.final_localization( + jobId, val + ) + extra_tasks += new_tasks exports.append('export {}={}'.format( key, - shlex.quote(val.path.computepath if isinstance(val.path, PathType) else val.path) + exportable_value )) - else: - print("Unknown localization command:", val.type, "skipping", key, val.path, file=sys.stderr) setup_script = '\n'.join( line.rstrip() for line in [ @@ -592,7 +603,7 @@ def localize_file(self, src: str, dest: PathType, transport: typing.Optional[Abs pass @abc.abstractmethod - def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: typing.Dict[str, str], overrides: typing.Optional[typing.Dict[str, typing.Optional[str]]] = None) -> str: + def localize(self, inputs: typing.Dict[str, typing.Dict[str, typing.Union[str, typing.List[str]]]], patterns: typing.Dict[str, str], overrides: typing.Optional[typing.Dict[str, typing.Optional[str]]] = None) -> str: """ 3 phase task: 1) Pre-scan inputs to determine proper localization strategy for all inputs diff --git a/canine/localization/local.py b/canine/localization/local.py index 3f986151..fa5d9d42 100644 --- a/canine/localization/local.py +++ b/canine/localization/local.py @@ -72,7 +72,7 @@ def __enter__(self): os.mkdir(self.environment('local')['CANINE_OUTPUT']) return self - def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: typing.Dict[str, str], overrides: typing.Optional[typing.Dict[str, typing.Optional[str]]] = None) -> str: + def localize(self, inputs: typing.Dict[str, typing.Dict[str, typing.Union[str, typing.List[str]]]], patterns: typing.Dict[str, str], overrides: typing.Optional[typing.Dict[str, typing.Optional[str]]] = None) -> str: """ 3 phase task: 1) Pre-scan inputs to determine proper localization strategy for all inputs diff --git a/pipeline_options.md b/pipeline_options.md index 0d251287..8d9a34a7 100644 --- a/pipeline_options.md +++ b/pipeline_options.md @@ -90,6 +90,11 @@ adapter to use, and how it should be configured. Adapters are used to parse the input configuration provided by the user into a specific set of inputs for each job. Below are the two types of available input adapters, and an example configuration for each. +### Global Options + +* `arrays`: A list of input names which should be considered a bash array, instead of +an array of individual arguments + ### Manual (default) adapter The Manual adapter is default. It takes inputs at face value. Constant inputs will From 1fae6eee3a9e42311077aa9611feabbb850a44cf Mon Sep 17 00:00:00 2001 From: Aaron Graubert Date: Wed, 5 Feb 2020 12:57:46 -0500 Subject: [PATCH 02/10] Finished array inputs for non-firecloud adapters --- canine/localization/base.py | 44 +++++++++--- canine/localization/nfs.py | 81 --------------------- docs/canine/adapters.html | 4 +- docs/canine/localization.html | 128 ++++++++++++++++++++++++++++++---- docs/genindex.html | 37 ++++++++++ docs/objects.inv | Bin 1788 -> 1848 bytes docs/searchindex.js | 2 +- 7 files changed, 187 insertions(+), 109 deletions(-) diff --git a/canine/localization/base.py b/canine/localization/base.py index 7e132bfe..b1ec4e13 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -364,29 +364,50 @@ def delocalize(self, patterns: typing.Dict[str, str], output_dir: str = 'canine_ output_files[jobId][outputname] = [dirpath] return output_files + def flatten_inputs(self, obj: typing.Any) -> typing.Generator[str, None, None]: + """ + Recurses over any object given to yield the flattened elements. + Used to pick out the filepaths which will be commonized + """ + if isinstance(obj, dict): + for val in obj.values(): + yield from self.flatten_inputs(val) + elif isinstance(obj, list): + for val in obj: + yield from self.flatten_inputs(val) + else: + yield obj + def pick_common_inputs(self, inputs: typing.Dict[str, typing.Dict[str, typing.Union[str, typing.List[str]]]], overrides: typing.Dict[str, typing.Optional[str]], transport: typing.Optional[AbstractTransport] = None) -> typing.Dict[str, str]: """ Scans input configuration and overrides to choose inputs which should be treated as common. Returns the dictionary of common inputs {input path: common path} """ with self.transport_context(transport) as transport: - self.common_inputs = set() seen = set() + self.common_inputs = set() + seen_forced = set() for jobId, values in inputs.items(): for arg, paths in values.items(): - for path in (paths if isinstance(paths, list) else (paths,)): - if path in seen and (arg not in overrides or overrides[arg] == 'common'): - self.common_inputs.add(path) - if arg in overrides and overrides[arg] == 'common': - self.common_inputs.add(path) - seen.add(path) + if arg not in overrides: + print(arg, "is not overridden; scanning inputs") + for path in self.flatten_inputs(paths): + if path in seen: + print("already encountered", path, " (adding to common)") + self.common_inputs.add(path) + else: + print("first encounter of", path) + seen.add(path) + elif arg not in seen_forced and overrides[arg] == 'common': + print("Arg", arg, "not already forced") + print("Forcing", arg, [*self.flatten_inputs(paths)]) + seen_forced.add(arg) + self.common_inputs |= {*self.flatten_inputs(paths)} common_dests = {} for path in self.common_inputs: if path.startswith('gs://') or os.path.exists(path): common_dests[path] = self.reserve_path('common', os.path.basename(os.path.abspath(path))) self.localize_file(path, common_dests[path], transport=transport) -# else: -# print("Could not handle common file", path, file=sys.stderr) return {key: value for key, value in common_dests.items()} def finalize_staging_dir(self, jobs: typing.Iterable[str], transport: typing.Optional[AbstractTransport] = None) -> str: @@ -420,6 +441,7 @@ def handle_input(self, jobId: str, input_name: str, input_value: typing.Union[st self.handle_input( jobId, input_name, + elem, common_dests, overrides, transport @@ -528,9 +550,9 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ ) extra_tasks += new_tasks array_values.append(exportable_value) - exports.append('export {}="{}"'.format( + exports.append('export {}={}'.format( key, - ':'.join(array_values) + shlex.quote('\t'.join(array_values)) )) else: new_tasks, exportable_value = self.final_localization( diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index 4cf295ac..eef00629 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -149,87 +149,6 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty ) return self.finalize_staging_dir(inputs) - def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str]: - """ - Returns a tuple of (setup script, teardown script) for the given job id. - Must call after pre-scanning inputs - """ - job_vars = [] - exports = [] - extra_tasks = [ - 'if [[ -d $CANINE_JOB_INPUTS ]]; then cd $CANINE_JOB_INPUTS; fi' - ] - compute_env = self.environment('compute') - for key, val in self.inputs[jobId].items(): - if val.type == 'stream': - job_vars.append(shlex.quote(key)) - dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) - extra_tasks += [ - 'if [[ -e {0} ]]; then rm {0}; fi'.format(dest.computepath), - 'mkfifo {}'.format(dest.computepath), - "gsutil {} cat {} > {} &".format( - '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(val.path) else '', - shlex.quote(val.path), - dest.computepath - ) - ] - exports.append('export {}="{}"'.format( - key, - dest.computepath - )) - elif val.type == 'download': - job_vars.append(shlex.quote(key)) - dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) - extra_tasks += [ - "if [[ ! -e {2}.fin ]]; then gsutil {0} -o GSUtil:check_hashes=if_fast_else_skip cp {1} {2} && touch {2}.fin; fi".format( - '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(val.path) else '', - shlex.quote(val.path), - dest.computepath - ) - ] - exports.append('export {}="{}"'.format( - key, - dest.computepath - )) - elif val.type is None: - job_vars.append(shlex.quote(key)) - exports.append('export {}={}'.format( - key, - shlex.quote(val.path.computepath if isinstance(val.path, PathType) else val.path) - )) - else: - print("Unknown localization command:", val.type, "skipping", key, val.path, file=sys.stderr) - setup_script = '\n'.join( - line.rstrip() - for line in [ - '#!/bin/bash', - 'export CANINE_JOB_VARS={}'.format(':'.join(job_vars)), - 'export CANINE_JOB_INPUTS="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'inputs')), - 'export CANINE_JOB_ROOT="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'workspace')), - 'export CANINE_JOB_SETUP="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'setup.sh')), - 'export CANINE_JOB_TEARDOWN="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'teardown.sh')), - 'mkdir -p $CANINE_JOB_INPUTS', - 'mkdir -p $CANINE_JOB_ROOT', - ] + exports + extra_tasks - ) + '\ncd $CANINE_JOB_ROOT\n' - teardown_script = '\n'.join( - line.rstrip() - for line in [ - '#!/bin/bash', - 'if [[ -d {0} ]]; then cd {0}; fi'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'workspace')), - # 'mv ../stderr ../stdout .', - 'if which python3 2>/dev/null >/dev/null; then python3 {0} {1} {2} {3}; else python {0} {1} {2} {3}; fi'.format( - os.path.join(compute_env['CANINE_ROOT'], 'delocalization.py'), - compute_env['CANINE_OUTPUT'], - jobId, - ' '.join( - '-p {} {}'.format(name, shlex.quote(pattern)) - for name, pattern in patterns.items() - ) - ), - ] - ) - return setup_script, teardown_script def delocalize(self, patterns: typing.Dict[str, str], output_dir: typing.Optional[str] = None) -> typing.Dict[str, typing.Dict[str, str]]: """ diff --git a/docs/canine/adapters.html b/docs/canine/adapters.html index 39df5ae6..225d4c83 100644 --- a/docs/canine/adapters.html +++ b/docs/canine/adapters.html @@ -37,7 +37,7 @@

canine.adapters
-class canine.adapters.ManualAdapter(alias: Union[None, str, List[str]] = None, product: bool = False)
+class canine.adapters.ManualAdapter(alias: Union[None, str, List[str]] = None, product: bool = False, arrays: List[str] = None)

Handles manual argument formatting Does pretty much nothing, except maybe combining arguments

@@ -66,7 +66,7 @@

canine.adapters
-class canine.adapters.FirecloudAdapter(workspace: str, entityType: str, entityName: str, entityExpression: Optional[str] = None, write_to_workspace: bool = True, alias: Union[None, str, List[str]] = None)
+class canine.adapters.FirecloudAdapter(workspace: str, entityType: str, entityName: str, entityExpression: Optional[str] = None, write_to_workspace: bool = True, alias: Union[None, str, List[str]] = None, arrays: List[str] = None)

Job input adapter Parses inputs as firecloud expression bois if enabled, job outputs will be written back to workspace

diff --git a/docs/canine/localization.html b/docs/canine/localization.html index bbe5c6c5..90b27123 100644 --- a/docs/canine/localization.html +++ b/docs/canine/localization.html @@ -58,6 +58,13 @@

canine.localization +
+final_localization(jobId: str, request: canine.localization.base.Localization) → Tuple[List[str], str]
+

Processes final localization commands. +Returns a list of extra tasts to add, as well as the final exportable value

+

+
finalize_staging_dir(jobs: Iterable[str], transport: Optional[canine.backends.base.AbstractTransport] = None) → str
@@ -66,6 +73,13 @@

canine.localization +
+flatten_inputs(obj: Any) → Generator[str, None, None]
+

Recurses over any object given to yield the flattened elements. +Used to pick out the filepaths which will be commonized

+

+
get_requester_pays(path: str) → bool
@@ -94,6 +108,16 @@

canine.localization +
+handle_input(jobId: str, input_name: str, input_value: Union[str, List[str]], common_dests: Dict[str, str], overrides: Dict[str, Optional[str]], transport: canine.backends.base.AbstractTransport) → canine.localization.base.Localization
+

Handles a singular input. +Localizes sraightforward files. Stream/Delay files are marked for later handling +Returns Localization objects for the determined actions still required +(Localization objects are handled during setup_teardown) +Job array inputs have their elements individually handled here

+

+
job_setup_teardown(jobId: str, patterns: Dict[str, str]) → Tuple[str, str]
@@ -103,7 +127,7 @@

canine.localization
-localize(inputs: Dict[str, Dict[str, str]], patterns: Dict[str, str], overrides: Optional[Dict[str, Optional[str]]] = None) → str
+localize(inputs: Dict[str, Dict[str, Union[str, List[str]]]], patterns: Dict[str, str], overrides: Optional[Dict[str, Optional[str]]] = None) → str

3 phase task: 1) Pre-scan inputs to determine proper localization strategy for all inputs 2) Begin localizing job inputs. For each job, check the predetermined strategy @@ -123,16 +147,17 @@

canine.localization
-pick_common_inputs(inputs: Dict[str, Dict[str, str]], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None) → Dict[str, str]
+pick_common_inputs(inputs: Dict[str, Dict[str, Union[str, List[str]]]], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None) → Dict[str, str]

Scans input configuration and overrides to choose inputs which should be treated as common. Returns the dictionary of common inputs {input path: common path}

-prepare_job_inputs(jobId: str, job_inputs: Dict[str, str], common_dests: Dict[str, str], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None)
+prepare_job_inputs(jobId: str, job_inputs: Dict[str, Union[str, List[str]]], common_dests: Dict[str, str], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None)

Prepares job-specific inputs. -Fills self.inputs[jobId] with Localization objects for each input

+Fills self.inputs[jobId] with Localization objects for each input +Just a slight logical wrapper over the recursive handle_input

@@ -193,6 +218,13 @@

canine.localization +
+final_localization(jobId: str, request: canine.localization.base.Localization) → Tuple[List[str], str]
+

Processes final localization commands. +Returns a list of extra tasts to add, as well as the final exportable value

+

+
finalize_staging_dir(jobs: Iterable[str], transport: Optional[canine.backends.base.AbstractTransport] = None) → str
@@ -201,6 +233,13 @@

canine.localization +
+flatten_inputs(obj: Any) → Generator[str, None, None]
+

Recurses over any object given to yield the flattened elements. +Used to pick out the filepaths which will be commonized

+

+
get_requester_pays(path: str) → bool
@@ -229,6 +268,16 @@

canine.localization +
+handle_input(jobId: str, input_name: str, input_value: Union[str, List[str]], common_dests: Dict[str, str], overrides: Dict[str, Optional[str]], transport: canine.backends.base.AbstractTransport) → canine.localization.base.Localization
+

Handles a singular input. +Localizes sraightforward files. Stream/Delay files are marked for later handling +Returns Localization objects for the determined actions still required +(Localization objects are handled during setup_teardown) +Job array inputs have their elements individually handled here

+

+
job_setup_teardown(jobId: str, patterns: Dict[str, str]) → Tuple[str, str]
@@ -238,7 +287,7 @@

canine.localization
-localize(inputs: Dict[str, Dict[str, str]], patterns: Dict[str, str], overrides: Optional[Dict[str, Optional[str]]] = None) → str
+localize(inputs: Dict[str, Dict[str, Union[str, List[str]]]], patterns: Dict[str, str], overrides: Optional[Dict[str, Optional[str]]] = None) → str

3 phase task: 1) Pre-scan inputs to determine proper localization strategy for all inputs 2) Begin localizing job inputs. For each job, check the predetermined strategy @@ -258,16 +307,17 @@

canine.localization
-pick_common_inputs(inputs: Dict[str, Dict[str, str]], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None) → Dict[str, str]
+pick_common_inputs(inputs: Dict[str, Dict[str, Union[str, List[str]]]], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None) → Dict[str, str]

Scans input configuration and overrides to choose inputs which should be treated as common. Returns the dictionary of common inputs {input path: common path}

-prepare_job_inputs(jobId: str, job_inputs: Dict[str, str], common_dests: Dict[str, str], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None)
+prepare_job_inputs(jobId: str, job_inputs: Dict[str, Union[str, List[str]]], common_dests: Dict[str, str], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None)

Prepares job-specific inputs. -Fills self.inputs[jobId] with Localization objects for each input

+Fills self.inputs[jobId] with Localization objects for each input +Just a slight logical wrapper over the recursive handle_input

@@ -320,6 +370,13 @@

canine.localization +
+final_localization(jobId: str, request: canine.localization.base.Localization) → Tuple[List[str], str]
+

Processes final localization commands. +Returns a list of extra tasts to add, as well as the final exportable value

+

+
finalize_staging_dir(jobs: Iterable[str], transport: Optional[canine.backends.base.AbstractTransport] = None) → str
@@ -328,6 +385,13 @@

canine.localization +
+flatten_inputs(obj: Any) → Generator[str, None, None]
+

Recurses over any object given to yield the flattened elements. +Used to pick out the filepaths which will be commonized

+

+
get_requester_pays(path: str) → bool
@@ -356,6 +420,16 @@

canine.localization +
+handle_input(jobId: str, input_name: str, input_value: Union[str, List[str]], common_dests: Dict[str, str], overrides: Dict[str, Optional[str]], transport: canine.backends.base.AbstractTransport) → canine.localization.base.Localization
+

Handles a singular input. +Localizes sraightforward files. Stream/Delay files are marked for later handling +Returns Localization objects for the determined actions still required +(Localization objects are handled during setup_teardown) +Job array inputs have their elements individually handled here

+

+
job_setup_teardown(jobId: str, patterns: Dict[str, str]) → Tuple[str, str]
@@ -384,16 +458,17 @@

canine.localization
-pick_common_inputs(inputs: Dict[str, Dict[str, str]], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None) → Dict[str, str]
+pick_common_inputs(inputs: Dict[str, Dict[str, Union[str, List[str]]]], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None) → Dict[str, str]

Scans input configuration and overrides to choose inputs which should be treated as common. Returns the dictionary of common inputs {input path: common path}

-prepare_job_inputs(jobId: str, job_inputs: Dict[str, str], common_dests: Dict[str, str], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None)
+prepare_job_inputs(jobId: str, job_inputs: Dict[str, Union[str, List[str]]], common_dests: Dict[str, str], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None)

Prepares job-specific inputs. -Fills self.inputs[jobId] with Localization objects for each input

+Fills self.inputs[jobId] with Localization objects for each input +Just a slight logical wrapper over the recursive handle_input

@@ -452,6 +527,13 @@

canine.localization +
+final_localization(jobId: str, request: canine.localization.base.Localization) → Tuple[List[str], str]
+

Processes final localization commands. +Returns a list of extra tasts to add, as well as the final exportable value

+

+
finalize_staging_dir(jobs: Iterable[str], transport: Optional[canine.backends.base.AbstractTransport] = None) → str
@@ -460,6 +542,13 @@

canine.localization +
+flatten_inputs(obj: Any) → Generator[str, None, None]
+

Recurses over any object given to yield the flattened elements. +Used to pick out the filepaths which will be commonized

+

+
get_requester_pays(path: str) → bool
@@ -488,6 +577,16 @@

canine.localization +
+handle_input(jobId: str, input_name: str, input_value: Union[str, List[str]], common_dests: Dict[str, str], overrides: Dict[str, Optional[str]], transport: canine.backends.base.AbstractTransport) → canine.localization.base.Localization
+

Handles a singular input. +Localizes sraightforward files. Stream/Delay files are marked for later handling +Returns Localization objects for the determined actions still required +(Localization objects are handled during setup_teardown) +Job array inputs have their elements individually handled here

+

+
job_setup_teardown(jobId: str, patterns: Dict[str, str]) → Tuple[str, str]
@@ -518,16 +617,17 @@

canine.localization
-pick_common_inputs(inputs: Dict[str, Dict[str, str]], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None) → Dict[str, str]
+pick_common_inputs(inputs: Dict[str, Dict[str, Union[str, List[str]]]], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None) → Dict[str, str]

Scans input configuration and overrides to choose inputs which should be treated as common. Returns the dictionary of common inputs {input path: common path}

-prepare_job_inputs(jobId: str, job_inputs: Dict[str, str], common_dests: Dict[str, str], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None)
+prepare_job_inputs(jobId: str, job_inputs: Dict[str, Union[str, List[str]]], common_dests: Dict[str, str], overrides: Dict[str, Optional[str]], transport: Optional[canine.backends.base.AbstractTransport] = None)

Prepares job-specific inputs. -Fills self.inputs[jobId] with Localization objects for each input

+Fills self.inputs[jobId] with Localization objects for each input +Just a slight logical wrapper over the recursive handle_input

diff --git a/docs/genindex.html b/docs/genindex.html index 354d0dfa..b7e7b572 100644 --- a/docs/genindex.html +++ b/docs/genindex.html @@ -42,6 +42,7 @@

Index

| E | F | G + | H | I | J | K @@ -196,6 +197,16 @@

F

+
  • final_localization() (canine.localization.BatchedLocalizer method) + +
  • finalize_staging_dir() (canine.localization.BatchedLocalizer method) @@ -212,6 +223,16 @@

    F

    @@ -261,6 +282,22 @@

    G

    +

    H

    + + +
    +

    I

      diff --git a/docs/objects.inv b/docs/objects.inv index d7fcf8aa4301892f8ba39eadda6aba88d3707da3..8e5983cfc2542b3f9a25ffda01f649eedf66524c 100644 GIT binary patch delta 1748 zcmV;_1}pje4Y&@Fcz@eA5Qgu13U=D7)%KcO;!fJkByFb7Onc+NAS59I0TcioE2q9j zU$0M6khEkG1Z<1=(UUa<@c~=``|na~P$Icxl|H;`RnrMp{ZP^2()V08?0&s-t@w6q zy85oVT8)kC-wfR`>kM(q^qr|7)>c18`X)N9H(ZFCD!Jj?VSjYycB;9EU^-cAu9-MK zz~u=4m&+~R9A4g^G(ZPZ!7X$)0%@#_;VPHmyC(T2h%-6SsrFlx7kF z^2sCPcYh4Ya)zOJt%6NWk`{z&QH#qy#4ZpJ%<4YDauCoLA{OqP-~AWw@^j)jW=)!= z-m^pPRP94l=}_}qH{6n2!D>wmY587xdGzDJ@PODfLgms6Iib#P+FGbBmq7CJus4De ziZl-|M{8oX76t|d)Ddz;4Lgw#N*12rFHaegP)y>G#Dmg@BtIjGNHa51i6@}T6N)r6C#^_R zmnRo#W=4vUhAvJt(#X7YBh6j>%VZLnna3o#$ZQ5&!%?<+qfp((|5mY4zE-15l}4fZ zj8c^uWoj}CRbv#X!zfaLQLgr)Y}JM6-+$@zuf%aBV^weG?WxMQ$vtoSCC{4o950cP@!8ztL>c4m+_ToO^w>1Aa&G-miia-I<6n|22>J*aE z0u}Ph>`0(Yg=n-=&G=xk3i&X-nuAf26_PQk6;ktst8-jLPj?s^Qa^BwjjzqN>jmQ% zCUjKh&KS>hUQ^+>W^DGCF#L+{ok-$1n2ShppLqddV^q6maesaR#tTFyfjidb&I0ft zI{riQWrY4&GAUjN8nz*w$bYr#u=hmbw4Dn$oo@0W{lP!%;;-nsvre@i4W~YEx^Vwa z`uYviNtZSJn(8iD?DKMp_kjS)aPm_gqGc^$B3?;ZtNkNEjB})6jC7!Gw>*Smji8J<$LC`yX5R`v$=pp6i>H qU)GVK^x^8*FtR?38)F>Q`Vf)srai`MeQ~em|8|k#(FwSc##fEK%(SO12WV48%J5ej9fFEyQ zIl}#AV#_v%*Y_t4Q5Libne5mWv)ZZsJ<~wB?3%aZk$?%Otk*!+a8GzgY|t19I7PI9 znhDiegNl@$MPwQUG!6BMh>q}_M=&8E7%argvep#1*i?6f?m;v`XFs;Zoc+}PlXO4y zXHA4*Anb>hY=7aFce;Hyo)NJ8l$`=yyE)EUHyND97#q52gfW=7mwPA#=ge5=OvVyH z!PLaboc0fbm!dPVk)_ywbj>@qsW~Z=|j%?~IeyS_5egX@h-~lz;GS!JJVr6mkyDk;FL-NtEW` z3>6B%GGr$}%h8ztE?rmxunZ*$(2}KN&dQw7b0GQ*{(OFgY18+FSks*gxG*ke4UaC) zFeoTy8m5s*E*mG zd%$E_4S!g?R-#>+AT1KA#d0q56uY2+(5&tfEQ5vEfa|3^sqO8n57&)=OkID)DeO5;FKS!m9|8rJ)MC-G%5&$2TmjTRZd%O$V<5?^(3Giv@ zS%6Q5Po(=e<^mO3#Uis(io9v_@217~> zqJMbI1R**ILYfd|BP9zj@Y7R}IAdm+rEU7)P+DC^WhDuxMiKaq*?v z8vt9VeLRLt&zT4!?Lo06+QS!dIz)N)$k^iS=O}0c!I>1;0e5dn<2{Ygo`b{Q#i8pPT*+5 z>rt^}cY?(c-3by|a>wtPII8Qh(1h3LqG_)W#+P4z1Z)MK@i9a=07cMYpNcKTKEI6b zAqujO#@6H+A6b@tKAJMm!C2z#lX3Lfr{+s^XBiLvJND2Jdm}wIzSY~Vmw(1zfUBUO zpD`|UUQ_O-X6WjdFq}sd(Zu#|SVRgd+XBQ!%XSZ8)m?ybGM-6b7TVmaK)4YV{_?Yo z(BDfY#VrI4Y)Hp*?K;do9@!pa0jE<&{r&-e^W$2(obgPWrkD(+Qh3 z{FU{U6Kh4XSD_K ze4Cuy0q;8 Date: Wed, 5 Feb 2020 13:02:55 -0500 Subject: [PATCH 03/10] Firecloud adapter should not allow fixed arrays It already does implicitly; If an expression evaluates to an array, that will be handled. Adding the arrays: option was just confusing --- canine/adapters/base.py | 6 +++--- canine/adapters/firecloud.py | 9 ++------- docs/canine/adapters.html | 2 +- pipeline_options.md | 12 +++++++----- 4 files changed, 13 insertions(+), 16 deletions(-) diff --git a/canine/adapters/base.py b/canine/adapters/base.py index 095f2708..dc24290c 100644 --- a/canine/adapters/base.py +++ b/canine/adapters/base.py @@ -45,7 +45,7 @@ class AbstractAdapter(abc.ABC): Base class for pipeline input adapters """ - def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, arrays: typing.List[str] = None): + def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None): """ Initializes the adapter. If alias is provided, it is used to specify custom job aliases. @@ -53,7 +53,6 @@ def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, arra (the input variable to use as the alias) """ self.alias = alias - self.arrays = arrays if arrays is not None else [] @abc.abstractmethod @@ -96,7 +95,8 @@ def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, prod alias may be a list of strings (an alias for each job) or a single string (the input variable to use as the alias) """ - super().__init__(alias=alias, arrays=arrays) + super().__init__(alias=alias) + self.arrays = arrays if arrays is not None else [] self.product = product self.__spec = None self._job_length = 0 diff --git a/canine/adapters/firecloud.py b/canine/adapters/firecloud.py index c13914c9..430017eb 100644 --- a/canine/adapters/firecloud.py +++ b/canine/adapters/firecloud.py @@ -17,7 +17,6 @@ def __init__( self, workspace: str, entityType: str, entityName: str, entityExpression: typing.Optional[str] = None, write_to_workspace: bool = True, alias: typing.Union[None, str, typing.List[str]] = None, - arrays: typing.List[str] = None ): """ Initializes the adapter @@ -31,7 +30,7 @@ def __init__( alias may be a list of strings (an alias for each job) or a single string (the input variable to use as the alias) """ - super().__init__(alias=alias, arrays=arrays) + super().__init__(alias=alias) self.workspace = dalmatian.WorkspaceManager(workspace) if entityName not in self.workspace._get_entities_internal(entityType).index: raise NameError('No such {} "{}" in workspace {}'.format( @@ -67,11 +66,7 @@ def evaluate(self, etype: str, entity: str, expr: str) -> str: elif len(results) == 0: return None else: - raise ValueError("Expression '{}' on {} {} returned more than one result".format( - expr, - etype, - entity - )) + return results def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing.List[typing.Any]]]) -> typing.Dict[str, typing.Dict[str, str]]: """ diff --git a/docs/canine/adapters.html b/docs/canine/adapters.html index 225d4c83..35650c7a 100644 --- a/docs/canine/adapters.html +++ b/docs/canine/adapters.html @@ -66,7 +66,7 @@

      canine.adapters
      -class canine.adapters.FirecloudAdapter(workspace: str, entityType: str, entityName: str, entityExpression: Optional[str] = None, write_to_workspace: bool = True, alias: Union[None, str, List[str]] = None, arrays: List[str] = None)
      +class canine.adapters.FirecloudAdapter(workspace: str, entityType: str, entityName: str, entityExpression: Optional[str] = None, write_to_workspace: bool = True, alias: Union[None, str, List[str]] = None)

      Job input adapter Parses inputs as firecloud expression bois if enabled, job outputs will be written back to workspace

      diff --git a/pipeline_options.md b/pipeline_options.md index 8d9a34a7..d73b832a 100644 --- a/pipeline_options.md +++ b/pipeline_options.md @@ -90,11 +90,6 @@ adapter to use, and how it should be configured. Adapters are used to parse the input configuration provided by the user into a specific set of inputs for each job. Below are the two types of available input adapters, and an example configuration for each. -### Global Options - -* `arrays`: A list of input names which should be considered a bash array, instead of -an array of individual arguments - ### Manual (default) adapter The Manual adapter is default. It takes inputs at face value. Constant inputs will @@ -106,6 +101,13 @@ may be of different lengths, and each job will be launched with a unique combina of array inputs. The manual adapter does not do any handling of outputs besides base delocalization. +* `product`: If false (the default) all inputs must be of equal length or a constant. +In this case, inputs will take one value per job. If true, arrays may be of unequal length +and one job will be launched for each combination of elements +* `arrays`: A list of input names which should be considered a bash array, instead of +an array of individual arguments + + Here is an example adapter configuration using the default settings: ```yaml adapter: From 124cf78ed6349c459a66b4acf3917c9b0d0afe12 Mon Sep 17 00:00:00 2001 From: Aaron Graubert Date: Fri, 28 Feb 2020 11:07:24 -0500 Subject: [PATCH 04/10] Simplifying array handling --- canine/adapters/base.py | 18 ++++++++++++++---- canine/localization/base.py | 2 +- pipeline_options.md | 3 --- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/canine/adapters/base.py b/canine/adapters/base.py index dc24290c..15ec8f05 100644 --- a/canine/adapters/base.py +++ b/canine/adapters/base.py @@ -12,7 +12,10 @@ def __init__(self, items): @property def is_2d(self): - return len(self.items) > 0 and isinstance(self.items[0], list) + for item in self.items: + if not isinstance(item, list): + return False + return len(self.items) > 0 def __len__(self): return len(self.items) if self.is_2d else 1 @@ -87,7 +90,7 @@ class ManualAdapter(AbstractAdapter): Does pretty much nothing, except maybe combining arguments """ - def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, product: bool = False, arrays: typing.List[str] = None): + def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, product: bool = False, common_inputs: typing.List[str] = None): """ Initializes the adapter If product is True, array arguments will be combined, instead of co-iterated. @@ -96,11 +99,18 @@ def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, prod (the input variable to use as the alias) """ super().__init__(alias=alias) - self.arrays = arrays if arrays is not None else [] + self.common_inputs = common_inputs if common_inputs is not None else [] self.product = product self.__spec = None self._job_length = 0 + def pin_arrays(self, key, val): + pinned = _FixedArray(val) + if pinned.is_2d or key in self.common_inputs: + return pinned + return val + + def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing.List[typing.Any]]]) -> typing.Dict[str, typing.Dict[str, str]]: """ Takes raw user inputs and parses out actual inputs for each job @@ -110,7 +120,7 @@ def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing. #Pin fixed arrays inputs = { - key: _FixedArray(val) if key in self.arrays else val + key: self.pin_arrays(key, val) for key,val in inputs.items() } diff --git a/canine/localization/base.py b/canine/localization/base.py index b1ec4e13..b91f5e86 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -429,7 +429,7 @@ def finalize_staging_dir(self, jobs: typing.Iterable[str], transport: typing.Opt def handle_input(self, jobId: str, input_name: str, input_value: typing.Union[str, typing.List[str]], common_dests: typing.Dict[str, str], overrides: typing.Dict[str, typing.Optional[str]], transport: AbstractTransport) -> Localization: """ Handles a singular input. - Localizes sraightforward files. Stream/Delay files are marked for later handling + Localizes straightforward files. Stream/Delay files are marked for later handling Returns Localization objects for the determined actions still required (Localization objects are handled during setup_teardown) Job array inputs have their elements individually handled here diff --git a/pipeline_options.md b/pipeline_options.md index d73b832a..4c4b7f9a 100644 --- a/pipeline_options.md +++ b/pipeline_options.md @@ -104,9 +104,6 @@ base delocalization. * `product`: If false (the default) all inputs must be of equal length or a constant. In this case, inputs will take one value per job. If true, arrays may be of unequal length and one job will be launched for each combination of elements -* `arrays`: A list of input names which should be considered a bash array, instead of -an array of individual arguments - Here is an example adapter configuration using the default settings: ```yaml From d5a8c6fdc9d6d149bf3eb6e88a76c30a339a2716 Mon Sep 17 00:00:00 2001 From: Aaron Graubert Date: Fri, 28 Feb 2020 15:48:53 -0500 Subject: [PATCH 05/10] Fixed array handling --- canine/adapters/base.py | 55 +++++++++++++++++++++++------------- canine/adapters/firecloud.py | 3 +- canine/backends/base.py | 6 ++-- canine/backends/local.py | 4 +-- canine/backends/remote.py | 16 +++++++++-- canine/localization/base.py | 13 ++------- canine/localization/nfs.py | 5 ++-- canine/orchestrator.py | 28 +++++++++--------- pipeline_options.md | 2 ++ 9 files changed, 79 insertions(+), 53 deletions(-) diff --git a/canine/adapters/base.py b/canine/adapters/base.py index 15ec8f05..607e3fd0 100644 --- a/canine/adapters/base.py +++ b/canine/adapters/base.py @@ -10,6 +10,19 @@ class _FixedArray(object): def __init__(self, items): self.items = items + @staticmethod + def flatten(items): + """ + Flattens the array tp the specified depth. + "Common" inputs only allowed 1D arrays + Otherwise only allowed 2D + """ + if not isinstance(items, list): + yield items + else: + for item in items: + yield from _FixedArray.flatten(item) + @property def is_2d(self): for item in self.items: @@ -37,18 +50,20 @@ def __getitem__(self, n): return elem raise ValueError("FixedArray is not 2d") - def stringify(self): - return [ - [str(item) for item in elem] if self.is_2d else str(elem) - for elem in self.items - ] + def stringify(self, two_d): + if two_d and self.is_2d: + return [ + [str(item) for item in _FixedArray.flatten(row)] + for row in self.items + ] + return [str(item) for item in _FixedArray.flatten(self.items)] class AbstractAdapter(abc.ABC): """ Base class for pipeline input adapters """ - def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None): + def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, common_inputs: typing.Optional[typing.Set[str]] = None): """ Initializes the adapter. If alias is provided, it is used to specify custom job aliases. @@ -56,6 +71,7 @@ def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None): (the input variable to use as the alias) """ self.alias = alias + self.common_inputs = common_inputs if common_inputs is not None else set() @abc.abstractmethod @@ -84,13 +100,19 @@ def spec(self) -> typing.Dict[str, typing.Dict[str, str]]: """ pass + def pin_arrays(self, key, val): + pinned = _FixedArray(val) + if pinned.is_2d or key in self.common_inputs: + return pinned + return val + class ManualAdapter(AbstractAdapter): """ Handles manual argument formatting Does pretty much nothing, except maybe combining arguments """ - def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, product: bool = False, common_inputs: typing.List[str] = None): + def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, product: bool = False, common_inputs: typing.Optional[typing.Set[str]] = None): """ Initializes the adapter If product is True, array arguments will be combined, instead of co-iterated. @@ -98,19 +120,11 @@ def __init__(self, alias: typing.Union[None, str, typing.List[str]] = None, prod alias may be a list of strings (an alias for each job) or a single string (the input variable to use as the alias) """ - super().__init__(alias=alias) - self.common_inputs = common_inputs if common_inputs is not None else [] + super().__init__(alias=alias, common_inputs=common_inputs) self.product = product self.__spec = None self._job_length = 0 - def pin_arrays(self, key, val): - pinned = _FixedArray(val) - if pinned.is_2d or key in self.common_inputs: - return pinned - return val - - def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing.List[typing.Any]]]) -> typing.Dict[str, typing.Dict[str, str]]: """ Takes raw user inputs and parses out actual inputs for each job @@ -123,11 +137,14 @@ def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing. key: self.pin_arrays(key, val) for key,val in inputs.items() } + print(inputs) + import pdb; pdb.set_trace() + keys = sorted(inputs) input_lengths = { # FixedArrays return actual length if they are 2d - key: len(val) if isinstance(val, list) or (isinstance(val, _FixedArray) and val.is_2d) else 1 + key: len(val) if isinstance(val, list) or (isinstance(val, _FixedArray) and key not in self.common_inputs) else 1 for key, val in inputs.items() } @@ -160,7 +177,7 @@ def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing. # XXX: simplify this with itertools.zip_longest() ? generator = zip(*[ inputs[key] if isinstance(inputs[key], list) else ( - iter(inputs[key]) if isinstance(inputs[key], _FixedArray) and inputs[key].is_2d else repeat(inputs[key], self._job_length) + iter(inputs[key]) if isinstance(inputs[key], _FixedArray) and key not in self.common_inputs else repeat(inputs[key], self._job_length) ) for key in keys ]) @@ -168,7 +185,7 @@ def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing. str(i): { # Unpack fixed arrays here # From localizer perspective, any lists are intentionally fixed lists - key: val.stringify() if isinstance(val, _FixedArray) else str(val) + key: val.stringify(key not in self.common_inputs) if isinstance(val, _FixedArray) else str(val) for key, val in zip(keys, job) } for i, job in enumerate(generator) diff --git a/canine/adapters/firecloud.py b/canine/adapters/firecloud.py index 430017eb..efc1e78b 100644 --- a/canine/adapters/firecloud.py +++ b/canine/adapters/firecloud.py @@ -17,6 +17,7 @@ def __init__( self, workspace: str, entityType: str, entityName: str, entityExpression: typing.Optional[str] = None, write_to_workspace: bool = True, alias: typing.Union[None, str, typing.List[str]] = None, + common_inputs: typing.Optional[typing.Set[str]] = None ): """ Initializes the adapter @@ -30,7 +31,7 @@ def __init__( alias may be a list of strings (an alias for each job) or a single string (the input variable to use as the alias) """ - super().__init__(alias=alias) + super().__init__(alias=alias, common_inputs=common_inputs) self.workspace = dalmatian.WorkspaceManager(workspace) if entityName not in self.workspace._get_entities_internal(entityType).index: raise NameError('No such {} "{}" in workspace {}'.format( diff --git a/canine/backends/base.py b/canine/backends/base.py index a67f2c47..86f4ee27 100644 --- a/canine/backends/base.py +++ b/canine/backends/base.py @@ -91,7 +91,7 @@ def mkdir(self, path: str): pass @abc.abstractmethod - def stat(self, path: str) -> typing.Any: + def stat(self, path: str, follow_symlinks: bool = True) -> typing.Any: """ Returns stat information """ @@ -241,12 +241,12 @@ def _rmtree(self, path: str, pathstat: os.stat_result): for fname in self.listdir(path): fname = os.path.join(path, fname) try: - fstat = self.stat(fname) + fstat = self.stat(fname, follow_symlinks=False) except FileNotFoundError: # Handling for broken symlinks is bad self.remove(fname) else: - if stat.S_ISDIR(fstat.st_mode): + if stat.S_ISDIR(fstat.st_mode) and not stat.S_ISLNK(fstat.st_mode): self._rmtree( fname, fstat diff --git a/canine/backends/local.py b/canine/backends/local.py index fecd9e93..b8d07a9a 100644 --- a/canine/backends/local.py +++ b/canine/backends/local.py @@ -45,11 +45,11 @@ def mkdir(self, path: str): """ return os.mkdir(path) - def stat(self, path: str) -> typing.Any: + def stat(self, path: str, follow_symlinks: bool = True) -> typing.Any: """ Returns stat information """ - return os.stat(path) + return os.stat(path, follow_symlinks=follow_symlinks) def chmod(self, path: str, mode: int): """ diff --git a/canine/backends/remote.py b/canine/backends/remote.py index d5e39000..8f59b191 100644 --- a/canine/backends/remote.py +++ b/canine/backends/remote.py @@ -84,13 +84,25 @@ def mkdir(self, path: str): raise paramiko.SSHException("Transport is not connected") return self.session.mkdir(path) - def stat(self, path: str) -> typing.Any: + def stat(self, path: str, follow_symlinks: bool = True) -> typing.Any: """ Returns stat information """ if self.session is None: raise paramiko.SSHException("Transport is not connected") - return self.session.stat(path) + if follow_symlinks: + return self.session.stat(path) + else: + try: + dirname = os.path.dirname(path) + if dirname == '': + dirname = '.' + return { + attr.filename: attr + for attr in self.session.listdir_attr() + }[os.path.basename(path)] + except KeyError as e: + raise FileNotFoundError(path) from e def chmod(self, path: str, mode: int): """ diff --git a/canine/localization/base.py b/canine/localization/base.py index c28f608d..181405a0 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -67,7 +67,6 @@ def __init__( # self.staging_dir # )) self.inputs = {} # {jobId: {inputName: (handle type, handle value)}} - self.clean_on_exit = True self.project = project if project is not None else get_default_gcp_project() def get_requester_pays(self, path: str) -> bool: @@ -357,8 +356,8 @@ def delocalize(self, patterns: typing.Dict[str, str], output_dir: str = 'canine_ for outputname in os.listdir(start_dir): dirpath = os.path.join(start_dir, outputname) if os.path.isdir(dirpath): - if outputname not in patterns: - warnings.warn("Detected output directory {} which was not declared".format(dirpath)) + # if outputname not in patterns: + # warnings.warn("Detected output directory {} which was not declared".format(dirpath)) output_files[jobId][outputname] = glob.glob(os.path.join(dirpath, patterns[outputname])) elif outputname in {'stdout', 'stderr'} and os.path.isfile(dirpath): output_files[jobId][outputname] = [dirpath] @@ -453,7 +452,7 @@ def handle_input(self, jobId: str, input_name: str, input_value: typing.Union[st if input_value in common_dests or mode == 'common': # common override already handled # No localization needed - return Localization(None, common_dests[input_value]) + return Localization(None, common_dests[input_value] if input_value in common_dests else input_value) elif mode is not False: # User has specified an override if mode == 'stream': if input_value.startswith('gs://'): @@ -610,12 +609,6 @@ def __exit__(self, *args): """ if self._local_dir is not None: self._local_dir.cleanup() - if self.clean_on_exit: - try: - with self.backend.transport() as transport: - transport.rmdir(self.staging_dir) - except: - pass @abc.abstractmethod def localize_file(self, src: str, dest: PathType, transport: typing.Optional[AbstractTransport] = None): diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index 46937a97..f6200b55 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -60,7 +60,6 @@ def __init__( self.mount_path = transport.normpath(mount_path if mount_path is not None else staging_dir) self.staging_dir = self.mount_path self.inputs = {} # {jobId: {inputName: (handle type, handle value)}} - self.clean_on_exit = True self.project = project if project is not None else get_default_gcp_project() def localize_file(self, src: str, dest: PathType, transport: typing.Optional[AbstractTransport] = None): @@ -173,8 +172,8 @@ def delocalize(self, patterns: typing.Dict[str, str], output_dir: typing.Optiona for outputname in os.listdir(start_dir): dirpath = os.path.join(start_dir, outputname) if os.path.isdir(dirpath): - if outputname not in patterns: - warnings.warn("Detected output directory {} which was not declared".format(dirpath)) + # if outputname not in patterns: + # warnings.warn("Detected output directory {} which was not declared".format(dirpath)) output_files[jobId][outputname] = glob.glob(os.path.join(dirpath, patterns[outputname])) elif outputname in {'stdout', 'stderr'} and os.path.isfile(dirpath): output_files[jobId][outputname] = [dirpath] diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 77bae371..787ca377 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -138,14 +138,28 @@ def __init__(self, config: typing.Union[ self.raw_inputs = Orchestrator.stringify(config['inputs']) if 'inputs' in config else {} self.resources = Orchestrator.stringify(config['resources']) if 'resources' in config else {} + # + # localizer + self.localizer_args = config['localization'] if 'localization' in config else {} + if self.localizer_args['strategy'] not in LOCALIZERS: + raise ValueError("Unknown localization strategy '{}'".format(self.localizer_args)) + self._localizer_type = LOCALIZERS[self.localizer_args['strategy']] + self.localizer_overrides = {} + if 'overrides' in self.localizer_args: + self.localizer_overrides = {**self.localizer_args['overrides']} + # # adapter adapter = config['adapter'] if adapter['type'] not in ADAPTERS: raise ValueError("Unknown adapter type '{type}'".format(**adapter)) self._adapter_type=adapter['type'] - self.adapter = ADAPTERS[adapter['type']](**{arg:val for arg,val in adapter.items() if arg != 'type'}) + self.adapter = ADAPTERS[adapter['type']]( + common_inputs={arg for arg, val in self.localizer_overrides.items() if val.lower() in {'common', 'array'}}, + **{arg:val for arg,val in adapter.items() if arg != 'type'} + ) self.job_spec = self.adapter.parse_inputs(self.raw_inputs) + print(self.job_spec) # # backend @@ -156,16 +170,6 @@ def __init__(self, config: typing.Union[ self._slurmconf_path = backend['slurm_conf_path'] if 'slurm_conf_path' in backend else None self.backend = BACKENDS[self._backend_type](**backend) - # - # localizer - self.localizer_args = config['localization'] if 'localization' in config else {} - if self.localizer_args['strategy'] not in LOCALIZERS: - raise ValueError("Unknown localization strategy '{}'".format(self.localizer_args)) - self._localizer_type = LOCALIZERS[self.localizer_args['strategy']] - self.localizer_overrides = {} - if 'overrides' in self.localizer_args: - self.localizer_overrides = {**self.localizer_args['overrides']} - # # outputs self.output_map = {} @@ -218,7 +222,6 @@ def run_pipeline(self, output_dir: str = 'canine_output', dry_run: bool = False) entrypoint_path = self.localize_inputs_and_script(localizer) if dry_run: - localizer.clean_on_exit = False return self.job_spec print("Waiting for cluster to finish startup...") @@ -272,7 +275,6 @@ def run_pipeline(self, output_dir: str = 'canine_output', dry_run: bool = False) except: print("Encountered unhandled exception. Cancelling batch job", file=sys.stderr) self.backend.scancel(batch_id) - localizer.clean_on_exit = False raise finally: # Check if fully job-avoided so we still delocalize diff --git a/pipeline_options.md b/pipeline_options.md index 4c4b7f9a..e00f6218 100644 --- a/pipeline_options.md +++ b/pipeline_options.md @@ -480,6 +480,8 @@ The file will only be localized once. If the job is restarted (for instance, if already finish * `Common`: Forces the input to be localized to the common directory. This will override default common behavior (the file will always be localized to the `$CANINE_COMMON` directory) + * **NOTE:** `Common` can also be used to explicitly declare that an input which is a list should be shared across + all jobs, instead of one element per job * `null`: Forces the input to be treated as a plain string. No handling whatsoever will be applied to the input. From 9fe642c65eb88f71122f3db7944a1bd19cdb2e8f Mon Sep 17 00:00:00 2001 From: Aaron Graubert Date: Mon, 2 Mar 2020 17:49:27 -0500 Subject: [PATCH 06/10] Arrays now handled via common --- canine/adapters/base.py | 3 --- canine/localization/base.py | 5 ----- 2 files changed, 8 deletions(-) diff --git a/canine/adapters/base.py b/canine/adapters/base.py index 607e3fd0..47ac7c3d 100644 --- a/canine/adapters/base.py +++ b/canine/adapters/base.py @@ -137,9 +137,6 @@ def parse_inputs(self, inputs: typing.Dict[str, typing.Union[typing.Any, typing. key: self.pin_arrays(key, val) for key,val in inputs.items() } - print(inputs) - import pdb; pdb.set_trace() - keys = sorted(inputs) input_lengths = { diff --git a/canine/localization/base.py b/canine/localization/base.py index 181405a0..2bab8388 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -389,17 +389,12 @@ def pick_common_inputs(self, inputs: typing.Dict[str, typing.Dict[str, typing.Un for jobId, values in inputs.items(): for arg, paths in values.items(): if arg not in overrides: - print(arg, "is not overridden; scanning inputs") for path in self.flatten_inputs(paths): if path in seen: - print("already encountered", path, " (adding to common)") self.common_inputs.add(path) else: - print("first encounter of", path) seen.add(path) elif arg not in seen_forced and overrides[arg] == 'common': - print("Arg", arg, "not already forced") - print("Forcing", arg, [*self.flatten_inputs(paths)]) seen_forced.add(arg) self.common_inputs |= {*self.flatten_inputs(paths)} common_dests = {} From a60458f48f41e6713f6d89bb3550acfb3171dad5 Mon Sep 17 00:00:00 2001 From: Aaron Graubert Date: Tue, 10 Mar 2020 11:30:34 -0400 Subject: [PATCH 07/10] Fixed common-file overrides --- canine/localization/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canine/localization/base.py b/canine/localization/base.py index 6f00c5c3..703a3fb3 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -395,7 +395,7 @@ def pick_common_inputs(self, inputs: typing.Dict[str, typing.Dict[str, typing.Un self.common_inputs.add(path) else: seen.add(path) - elif arg not in seen_forced and overrides[arg] == 'common': + elif overrides[arg] == 'common': seen_forced.add(arg) self.common_inputs |= {*self.flatten_inputs(paths)} common_dests = {} From 1bce1f7b089838429d024ea4e2e1730b59c7c71b Mon Sep 17 00:00:00 2001 From: Aaron Graubert Date: Tue, 10 Mar 2020 11:54:21 -0400 Subject: [PATCH 08/10] Added unit test for array inputs --- canine/test/test_adapter_manual.py | 43 ++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/canine/test/test_adapter_manual.py b/canine/test/test_adapter_manual.py index f7cbee6c..5b5deac2 100644 --- a/canine/test/test_adapter_manual.py +++ b/canine/test/test_adapter_manual.py @@ -15,6 +15,49 @@ class TestUnit(unittest.TestCase): Runs tests on various utilities """ + def test_arrays(self): + for test in range(15): + with self.subTest(test=test): + raw_inputs = { + os.urandom(8).hex(): [os.urandom(8).hex() for _ in range(test+1)] + for _ in range(test+1) + } + twodkeys = [] + for i in range(test+1): + twodkeys.append(os.urandom(8).hex()) + raw_inputs[twodkeys[-1]] = [ + [os.urandom(8).hex() for x in range(y+1)] + for y in range(test+1) + ] + commkeys = [] + for i in range(test+1): + commkeys.append(os.urandom(8).hex()) + raw_inputs[commkeys[-1]] = [os.urandom(8).hex() for _ in range(test+1)] + + inputs = ManualAdapter(common_inputs=commkeys).parse_inputs({k:v for k,v in raw_inputs.items()}) + + self.assertEqual(test+1, len(inputs)) + + for i, jid in enumerate(inputs): + for k,v in inputs[jid].items(): + if k in twodkeys: + self.assertIsInstance(v, list) + self.assertListEqual( + v, + raw_inputs[k][i] + ) + elif k in commkeys: + self.assertIsInstance(v, list) + self.assertListEqual( + v, + raw_inputs[k] + ) + else: + self.assertEqual( + v, + raw_inputs[k][i] + ) + def test_regular(self): for test in range(15): with self.subTest(test=test): From d2a9948f0bfadd0fc05e8bcdfba044e9e9911125 Mon Sep 17 00:00:00 2001 From: Aaron Graubert Date: Wed, 13 May 2020 17:11:38 -0400 Subject: [PATCH 09/10] Added new unit tests for arrays and local downloads --- canine/localization/base.py | 98 +++++++------- canine/test/test_localizer_batched.py | 177 ++++++++++++++++++++++++++ 2 files changed, 221 insertions(+), 54 deletions(-) diff --git a/canine/localization/base.py b/canine/localization/base.py index 333e8f12..4b63f4bf 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -28,19 +28,6 @@ ['localpath', 'remotepath'] ) -JobConf = namedtuple( - 'JobConf', - [ - 'docker_args', - 'stream_dir_ready', - 'local_disk_name', - 'job_vars', - 'setup', - 'localization', - 'teardown' - ] -) - class AbstractLocalizer(abc.ABC): """ Base class for localization. @@ -518,10 +505,13 @@ def handle_input(self, jobId: str, input_name: str, input_value: typing.Union[st print("Ignoring 'delayed' override for", input_name, "with value", input_value, "and localizing now", file=sys.stderr) elif mode == 'local': if input_value.startswith('gs://'): + self.local_download_size[jobId] = self.local_download_size.get(jobId, 0) + self.get_object_size(input_value) return Localization('local', input_value) - print("Ignoring 'local' override for", arg, "with value", value, "and localizing now", file=sys.stderr) + print("Ignoring 'local' override for", input_name, "with value", input_value, "and localizing now", file=sys.stderr) elif mode is None or mode == 'null': return Localization(None, input_value) + else: + print("Unrecognized localization override:", mode) # At this point, no overrides have taken place, so handle by default if os.path.exists(input_value) or input_value.startswith('gs://'): remote_path = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(input_value))) @@ -555,19 +545,19 @@ def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, typing.Uni transport ) - def final_localization(self, jobId: str, request: Localization, conf: JobConf) -> str: + def final_localization(self, jobId: str, request: Localization, job_conf: typing.Dict[str, typing.Any]) -> str: """ Modifies job conf as final localization commands are processed Exported variable values are not added to conf and must be added manually, later """ if request.type == 'stream': - if not job_conf.stream_dir_ready: - job_conf.setup.append('export CANINE_STREAM_DIR=$(mktemp -d /tmp/canine_streams.$SLURM_ARRAY_JOB_ID.$SLURM_ARRAY_TASK_ID.XXXX)') - job_conf.docker_args.append('-v $CANINE_STREAM_DIR:$CANINE_STREAM_DIR') - job_conf.teardown.append('if [[ -n "$CANINE_STREAM_DIR" ]]; then rm -rf $CANINE_STREAM_DIR; fi') - job_conf.stream_dir_ready = True + if not job_conf['stream_dir_ready']: + job_conf['setup'].append('export CANINE_STREAM_DIR=$(mktemp -d /tmp/canine_streams.$SLURM_ARRAY_JOB_ID.$SLURM_ARRAY_TASK_ID.XXXX)') + job_conf['docker_args'].append('-v $CANINE_STREAM_DIR:$CANINE_STREAM_DIR') + job_conf['teardown'].append('if [[ -n "$CANINE_STREAM_DIR" ]]; then rm -rf $CANINE_STREAM_DIR; fi') + job_conf['stream_dir_ready'] = True dest = os.path.join('$CANINE_STREAM_DIR', os.path.basename(os.path.abspath(request.path))) - job_conf.localization += [ + job_conf['localization'] += [ 'gsutil ls {} > /dev/null'.format(shlex.quote(request.path)), 'if [[ -e {0} ]]; then rm {0}; fi'.format(dest), 'mkfifo {}'.format(dest), @@ -583,8 +573,8 @@ def final_localization(self, jobId: str, request: Localization, conf: JobConf) - dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(request.path))) else: # Local and controller paths not needed on this object - dest = PathType(None, os.path.join('$CANINE_LOCAL_DISK_DIR', os.path.basename(val.path))) - job_conf.localization += [ + dest = PathType(None, os.path.join('$CANINE_LOCAL_DISK_DIR', os.path.basename(request.path))) + job_conf['localization'] += [ "if [[ ! -e {2}.fin ]]; then gsutil {0} -o GSUtil:check_hashes=if_fast_else_skip cp {1} {2} && touch {2}.fin; fi".format( '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(request.path) else '', shlex.quote(request.path), @@ -593,7 +583,7 @@ def final_localization(self, jobId: str, request: Localization, conf: JobConf) - ] return dest.remotepath elif request.type == None: - return [], shlex.quote(request.path.computepath if isinstance(request.path, PathType) else request.path) + return shlex.quote(request.path.remotepath if isinstance(request.path, PathType) else request.path) raise TypeError("request type '{}' not supported at this stage".format(request.type)) def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str, str]: @@ -606,15 +596,15 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ # - job variables and exports are set when setup.sh is _sourced_ # - localization tasks are run when localization.sh is _run_ - job_conf = JobConf( - ['-v $CANINE_ROOT:$CANINE_ROOT'], # Docker args - False, # is stream configured - None, # Job local disk name - [], # Job variables - [], # setup tasks - ['if [[ -d $CANINE_JOB_INPUTS ]]; then cd $CANINE_JOB_INPUTS; fi'], # localization tasks - [], #teardown tasks - ) + job_conf = { + 'docker_args': ['-v $CANINE_ROOT:$CANINE_ROOT'], # Docker args + 'stream_dir_ready': False, # is stream configured + 'local_disk_name': None, # Job local disk name + 'job_vars': [], # Job variables + 'setup': [], # setup tasks + 'localization': ['if [[ -d $CANINE_JOB_INPUTS ]]; then cd $CANINE_JOB_INPUTS; fi'], # localization tasks + 'teardown': [], #teardown tasks + } local_download_size = self.local_download_size.get(jobId, 0) if local_download_size > 0 and self.temporary_disk_type is not None: @@ -624,29 +614,29 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ ) if local_download_size > 65535: raise ValueError("Cannot provision {} GB disk for job {}".format(local_download_size, jobId)) - job_conf.local_disk_name = 'canine-{}-{}-{}'.format(self.disk_key, os.urandom(4).hex(), jobId) + job_conf['local_disk_name'] = 'canine-{}-{}-{}'.format(self.disk_key, os.urandom(4).hex(), jobId) device_name = 'cn{}{}'.format(os.urandom(2).hex(), jobId) - job_conf.setup += [ + job_conf['setup'] += [ 'export CANINE_LOCAL_DISK_SIZE={}GB'.format(local_download_size), 'export CANINE_LOCAL_DISK_TYPE={}'.format(self.temporary_disk_type), 'export CANINE_NODE_NAME=$(curl -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/name)', 'export CANINE_NODE_ZONE=$(basename $(curl -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/zone))', - 'export CANINE_LOCAL_DISK_DIR={}/{}'.format(self.local_download_dir, job_conf.local_disk_name), + 'export CANINE_LOCAL_DISK_DIR={}/{}'.format(self.local_download_dir, job_conf['local_disk_name']), ] - job_conf.localization += [ + job_conf['localization'] += [ 'sudo mkdir -p $CANINE_LOCAL_DISK_DIR', 'if [[ -z "$CANINE_NODE_NAME" ]]; then echo "Unable to provision disk (not running on GCE instance). Attempting download to directory on boot disk" > /dev/stderr; else', - 'echo Provisioning and mounting temporary disk {}'.format(job_conf.local_disk_name), + 'echo Provisioning and mounting temporary disk {}'.format(job_conf['local_disk_name']), 'gcloud compute disks create {} --size {} --type pd-{} --zone $CANINE_NODE_ZONE'.format( - job_conf.local_disk_name, + job_conf['local_disk_name'], local_download_size, self.temporary_disk_type ), 'gcloud compute instances attach-disk $CANINE_NODE_NAME --zone $CANINE_NODE_ZONE --disk {} --device-name {}'.format( - job_conf.local_disk_name, + job_conf['local_disk_name'], device_name ), - 'gcloud compute instances set-disk-auto-delete $CANINE_NODE_NAME --zone $CANINE_NODE_ZONE --disk {}'.format(job_conf.local_disk_name), + 'gcloud compute instances set-disk-auto-delete $CANINE_NODE_NAME --zone $CANINE_NODE_ZONE --disk {}'.format(job_conf['local_disk_name']), 'sudo mkfs.ext4 -m 0 -E lazy_itable_init=0,lazy_journal_init=0,discard /dev/disk/by-id/google-{}'.format(device_name), 'sudo mount -o discard,defaults /dev/disk/by-id/google-{} $CANINE_LOCAL_DISK_DIR'.format( @@ -655,19 +645,19 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ 'sudo chmod -R a+rwX {}'.format(self.local_download_dir), 'fi' ] - job_conf.teardown += [ - 'sudo umount {}/{}'.format(self.local_download_dir, job_conf.local_disk_name), - 'gcloud compute instances detach-disk $CANINE_NODE_NAME --zone $CANINE_NODE_ZONE --disk {}'.format(job_conf.local_disk_name), - 'gcloud compute disks delete {} --zone $CANINE_NODE_ZONE'.format(job_conf.local_disk_name) + job_conf['teardown'] += [ + 'sudo umount {}/{}'.format(self.local_download_dir, job_conf['local_disk_name']), + 'gcloud compute instances detach-disk $CANINE_NODE_NAME --zone $CANINE_NODE_ZONE --disk {}'.format(job_conf['local_disk_name']), + 'gcloud compute disks delete {} --zone $CANINE_NODE_ZONE'.format(job_conf['local_disk_name']) ] - job_conf.docker_args.append('-v $CANINE_LOCAL_DISK_DIR:$CANINE_LOCAL_DISK_DIR') + job_conf['docker_args'].append('-v $CANINE_LOCAL_DISK_DIR:$CANINE_LOCAL_DISK_DIR') compute_env = self.environment('remote') stream_dir_ready = False for key, val in self.inputs[jobId].items(): - job_conf.job_vars.append(shlex.quote(key)) + job_conf['job_vars'].append(shlex.quote(key)) if val.type == 'array': # Hack: the array elements are exposed here as the Localization arg's .path attr - job_conf.setup.append( + job_conf['setup'].append( # Should we manually quote here instead of using shlex? # Wondering if the quoting might break some directives which embed environment variables in the path 'export {}={}'.format(key, shlex.quote('\t'.join( @@ -676,28 +666,28 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ ))) ) else: - job_conf.setup.append( + job_conf['setup'].append( 'export {}={}'.format(key, self.final_localization(jobId, val, job_conf)) ) setup_script = '\n'.join( line.rstrip() for line in [ '#!/bin/bash', - 'export CANINE_JOB_VARS={}'.format(':'.join(job_conf.job_vars)), + 'export CANINE_JOB_VARS={}'.format(':'.join(job_conf['job_vars'])), 'export CANINE_JOB_INPUTS="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'inputs')), 'export CANINE_JOB_ROOT="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'workspace')), 'export CANINE_JOB_SETUP="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'setup.sh')), 'export CANINE_JOB_TEARDOWN="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'teardown.sh')), 'mkdir -p $CANINE_JOB_INPUTS', 'mkdir -p $CANINE_JOB_ROOT', - ] + job_conf.setup - ) + '\nexport CANINE_DOCKER_ARGS="{docker}"\ncd $CANINE_JOB_ROOT\n'.format(docker=' '.join(job_conf.docker_args)) + ] + job_conf['setup'] + ) + '\nexport CANINE_DOCKER_ARGS="{docker}"\ncd $CANINE_JOB_ROOT\n'.format(docker=' '.join(job_conf['docker_args'])) # generate localization script localization_script = '\n'.join([ "#!/bin/bash", "set -e" - ] + job_conf.localization) + "\nset +e\n" + ] + job_conf['localization']) + "\nset +e\n" # generate teardown script teardown_script = '\n'.join( @@ -715,7 +705,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ for name, pattern in patterns.items() ) ), - ] + job_conf.teardown + ] + job_conf['teardown'] ) return setup_script, localization_script, teardown_script diff --git a/canine/test/test_localizer_batched.py b/canine/test/test_localizer_batched.py index 6c969538..4b833d4f 100644 --- a/canine/test/test_localizer_batched.py +++ b/canine/test/test_localizer_batched.py @@ -5,6 +5,7 @@ import stat import warnings import time +import random from contextlib import contextmanager from canine.backends.dummy import DummySlurmBackend from canine.localization.base import Localization, PathType @@ -137,6 +138,182 @@ class TestIntegration(unittest.TestCase): Tests high-level features of the localizer """ + @with_timeout(10) + def test_local_download(self): + with BatchedLocalizer(BACKEND) as localizer: + localizer.get_object_size = unittest.mock.MagicMock(side_effect=lambda *a,**k:random.randint(1,100000000000)) + inputs = { + str(jid): { + 'file': 'gs://foo/'+os.urandom(8).hex() + } + for jid in range(15) + } + overrides = {'file': 'local'} + with localizer.transport_context() as transport: + common_dests = localizer.pick_common_inputs( + inputs, + overrides, + transport + ) + for jobId, data in inputs.items(): + with self.subTest(jid=jobId): + os.makedirs(os.path.join( + localizer.environment('local')['CANINE_JOBS'], + jobId, + )) + localizer.prepare_job_inputs( + jobId, + data, + common_dests, + overrides, + transport=transport + ) + self.assertTupleEqual( + localizer.inputs[jobId]['file'], + Localization('local', inputs[jobId]['file']) + ) + + setup_text, localization_text, teardown_text = localizer.job_setup_teardown(jobId, {}) + self.assertIn('export CANINE_LOCAL_DISK_DIR', setup_text) + self.assertIn( + '{} $CANINE_LOCAL_DISK_DIR/{}'.format(inputs[jobId]['file'], os.path.basename(inputs[jobId]['file'])), + localization_text + ) + self.assertIn('gcloud compute disks delete', teardown_text) + + @with_timeout(10) + def test_array_inputs(self): + with tempfile.TemporaryDirectory() as tempdir: + common_file = makefile(os.path.join(tempdir, 'commfile')) + common_array = [ + 'string', common_file, 'gs://foo/bar' + ] + with BatchedLocalizer(BACKEND) as localizer: + localizer.localize_file = unittest.mock.MagicMock() + inputs = { + str(jid): { + 'array-common': common_array, + 'array-incommon': [os.urandom(8).hex(), makefile(os.path.join(tempdir, os.urandom(8).hex())), 'gs://foo/'+os.urandom(8).hex()], + 'array-stream': ['gs://foo/'+os.urandom(8).hex(), 'gs://foo/'+os.urandom(8).hex()], + 'array-download': ['gs://foo/'+os.urandom(8).hex(), 'gs://foo/'+os.urandom(8).hex()] + } + for jid in range(15) + } + overrides = { + 'array-common': 'common', + 'array-stream': 'stream', + 'array-download': 'delayed' + } + with localizer.transport_context() as transport: + common_dests = localizer.pick_common_inputs( + inputs, + overrides, + transport + ) + self.assertIn(common_file, common_dests) + self.assertIn('gs://foo/bar', common_dests) + + for jobId, data in inputs.items(): + with self.subTest(jid=jobId): + os.makedirs(os.path.join( + localizer.environment('local')['CANINE_JOBS'], + jobId, + )) + localizer.prepare_job_inputs( + jobId, + data, + common_dests, + overrides, + transport=transport + ) + + with self.subTest(arg='array-common'): + self.assertTupleEqual( + localizer.inputs[jobId]['array-common'], + Localization( + 'array', + [ + Localization(None, 'string'), + Localization(None, localizer.reserve_path('common', 'commfile')), + Localization(None, localizer.reserve_path('common', 'bar')) + ] + ) + ) + + localizer.localize_file.assert_any_call( + common_file, + localizer.reserve_path('common', 'commfile'), + transport=transport + ) + + localizer.localize_file.assert_any_call( + 'gs://foo/bar', + localizer.reserve_path('common', 'bar'), + transport=transport + ) + + with self.subTest(arg='array-incommon'): + self.assertTupleEqual( + localizer.inputs[jobId]['array-incommon'], + Localization( + 'array', + [ + Localization(None, inputs[jobId]['array-incommon'][0]), + Localization(None, localizer.reserve_path('jobs', jobId, 'inputs', os.path.basename(inputs[jobId]['array-incommon'][1]))), + Localization(None, localizer.reserve_path('jobs', jobId, 'inputs', os.path.basename(inputs[jobId]['array-incommon'][2]))), + ] + ) + ) + + localizer.localize_file.assert_any_call( + inputs[jobId]['array-incommon'][1], + localizer.reserve_path('jobs', jobId, 'inputs', os.path.basename(inputs[jobId]['array-incommon'][1])), + transport=transport + ) + + localizer.localize_file.assert_any_call( + inputs[jobId]['array-incommon'][2], + localizer.reserve_path('jobs', jobId, 'inputs', os.path.basename(inputs[jobId]['array-incommon'][2])), + transport=transport + ) + + with self.subTest(arg='array-stream'): + self.assertTupleEqual( + localizer.inputs[jobId]['array-stream'], + Localization( + 'array', + [ + Localization('stream', inputs[jobId]['array-stream'][0]), + Localization('stream', inputs[jobId]['array-stream'][1]), + ] + ) + ) + + + with self.subTest(arg='array-download'): + self.assertTupleEqual( + localizer.inputs[jobId]['array-download'], + Localization( + 'array', + [ + Localization('download', inputs[jobId]['array-download'][0]), + Localization('download', inputs[jobId]['array-download'][1]), + ] + ) + ) + + setup_text, localization_text, teardown_text = localizer.job_setup_teardown(jobId, {}) + self.assertIn( + "export array-common='{}'".format( + '\t'.join([ + 'string', + localizer.reserve_path('common', 'commfile').remotepath, + localizer.reserve_path('common', 'bar').remotepath + ]) + ), + setup_text + ) + @with_timeout(10) def test_common_inputs(self): with tempfile.TemporaryDirectory() as tempdir: From 1c13db215acbd21161159b100e02a9cfaf3b3812 Mon Sep 17 00:00:00 2001 From: Aaron Graubert Date: Wed, 13 May 2020 18:22:31 -0400 Subject: [PATCH 10/10] Added -u option to gsutil ls during stream precheck --- canine/localization/base.py | 7 +++++-- canine/orchestrator.py | 1 - 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/canine/localization/base.py b/canine/localization/base.py index 4b63f4bf..d7cb8a86 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -558,7 +558,10 @@ def final_localization(self, jobId: str, request: Localization, job_conf: typing job_conf['stream_dir_ready'] = True dest = os.path.join('$CANINE_STREAM_DIR', os.path.basename(os.path.abspath(request.path))) job_conf['localization'] += [ - 'gsutil ls {} > /dev/null'.format(shlex.quote(request.path)), + 'gsutil {} ls {} > /dev/null'.format( + '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(request.path) else '', + shlex.quote(request.path) + ), 'if [[ -e {0} ]]; then rm {0}; fi'.format(dest), 'mkfifo {}'.format(dest), "gsutil {} cat {} > {} &".format( @@ -615,7 +618,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ if local_download_size > 65535: raise ValueError("Cannot provision {} GB disk for job {}".format(local_download_size, jobId)) job_conf['local_disk_name'] = 'canine-{}-{}-{}'.format(self.disk_key, os.urandom(4).hex(), jobId) - device_name = 'cn{}{}'.format(os.urandom(2).hex(), jobId) + device_name = 'k9{}{}'.format(os.urandom(2).hex(), jobId) job_conf['setup'] += [ 'export CANINE_LOCAL_DISK_SIZE={}GB'.format(local_download_size), 'export CANINE_LOCAL_DISK_TYPE={}'.format(self.temporary_disk_type), diff --git a/canine/orchestrator.py b/canine/orchestrator.py index b1e97224..4fa8ac1a 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -164,7 +164,6 @@ def __init__(self, config: typing.Union[ **{arg:val for arg,val in adapter.items() if arg != 'type'} ) self.job_spec = self.adapter.parse_inputs(self.raw_inputs) - print(self.job_spec) # # backend