Skip to content

Commit

Permalink
Merge pull request #14 from djf604/v0.1.5-batch-run
Browse files Browse the repository at this point in the history
V0.1.5 batch run
  • Loading branch information
Dominic Fitzgerald committed Mar 27, 2018
2 parents ed55c1a + b0a6b82 commit b4830c5
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 31 deletions.
4 changes: 2 additions & 2 deletions docs/conf.py
Expand Up @@ -56,9 +56,9 @@
# built documents.
#
# The short X.Y version.
version = '0.1'
version = '0.1.5'
# The full version, including alpha/beta/rc tags.
release = '0.1.0'
release = '0.1.5'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
58 changes: 54 additions & 4 deletions docs/using_operon.rst
Expand Up @@ -87,9 +87,9 @@ To run an installed pipeline::
The set of accepted ``pipeline-options`` is defined by the pipeline itself and are meant to be values that change from
run to run, such as input files, metadata, etc. Three options will always exist:

* ``pipeline-config`` can point to a pipeline config to use for this run only
* ``parsl-config`` can point to a file containing JSON that represents a Parsl config to use for this run only
* ``logs-dir`` can point to a location where log files from this run should be deposited; if it doesn't exist, it
* ``--pipeline-config`` can point to a pipeline config to use for this run only
* ``--parsl-config`` can point to a file containing JSON that represents a Parsl config to use for this run only
* ``--logs-dir`` can point to a location where log files from this run should be deposited; if it doesn't exist, it
will be created; defaults to the currect directory

When an Operon pipeline is run, under the hood it creates a Parsl workflow which can be run in many different ways
Expand All @@ -113,7 +113,57 @@ The ``run`` subprogram attempts to pull a Parsl configuration from the user in t
4. A default parsl configuration provided by the pipeline
5. A package default parsl configuration of 8 workers using Python threads

For more detailed information, refer to AnotherPage
For more detailed information, refer to the
`Parsl documentation <http://parsl.readthedocs.io/en/latest/userguide/configuring.html>`_ on the subject.

Run a Pipeline in Batch
^^^^^^^^^^^^^^^^^^^^^^^
A common use case is to run many samples or input units independently through the same pipeline. The ``batch-run``
subcommand allows this use case and gives the whole run a common pool of resouces::

$ operon batch-run <pipeline-name> --input-matrix INPUT_MATRIX [--pipeline-config CONFIG] \
[--parsl-config CONFIG] [--logs-dir DIR]

Operon treats a ``batch-run`` like a single large workflow which happens to contains many disjoint sub-workflows. Every
node in the workflow graph is given equal access to a pool of resources so those resources are used most efficiently.

Input Matrix
************
Passing inputs into a ``batch-run`` isn't done on the command line but rather is pre-gathered into a tab-separated
matrix file of a specific format. The following formats are supported:

With Headers
------------
The header line should be a tab separated list of command line argument flags in the same format as one would use when
directly typing on the command line. Optional arguments should use their verbatim flags, and positional arguments
should use the form ``positional_i``, where ``i`` is the position from left-most to right-most. Subsequent lines
should have the same number of tab separated items, where each item is the value for its corresponding header.

Singleton arguments (where its presence or lack thereof denotes its value) can be specified in their affirmative form
in the header line. The values given should be either ``true`` or ``false``, which corresponds to whether they should
be included or not.

.. code-block:: text
--arg1 --inputs --singleton positional_0 positional_1
val1 /path/to/input1 true apples blue
val3 /path/to/inputN true strawberries green
val2 /path/to/inputABB false kale purple
.. note::

If the literal string ``"true"`` or ``"false"`` is needed, preface with a ``#`` as in ``#true``.

Without Headers
---------------
If the flag ``--literal-input`` is given to ``batch-run``, then the header lines does not need to exist and each line
is taken as a literal command line string which will be interpreted as if typed directly into the command line.

.. code-block:: text
--arg1 val1 --inputs /path/to/input1 --singleton apples blue
--arg1 val3 --inputs /path/to/inputN --singleton strawberries green
--arg1 val2 --inputs /path/to/inputABB kale purple
Command Line Help
^^^^^^^^^^^^^^^^^
Expand Down
2 changes: 1 addition & 1 deletion operon/_cli/_completer.py
Expand Up @@ -42,7 +42,7 @@ def completer():
completion_options = ''
if num_completed_tokens == 1:
completion_options = get_completion_options(
options=' '.join(get_operon_subcommands()),
options=' '.join(get_operon_subcommands().replace('_', '-')),
stub=stub_token
)
elif num_completed_tokens == 2:
Expand Down
138 changes: 138 additions & 0 deletions operon/_cli/subcommands/batch_run.py
@@ -0,0 +1,138 @@
import os
import sys
import argparse

from operon._cli.subcommands import BaseSubcommand
from operon._util.configs import parse_pipeline_config

ARGV_FIRST_ARGUMENT = 0
ARGV_PIPELINE_NAME = 0
EXIT_CMD_SUCCESS = 0
EXIT_CMD_SYNTAX_ERROR = 2


def usage():
return 'operon batch-run <pipeline-name> [-h] --input-matrix <input_matrix> [--separate-pools]'


class Subcommand(BaseSubcommand):
def help_text(self):
return 'Run a pipeline in batch with a shared resource pool.'

def run(self, subcommand_args):
# Get pipeline name or output help
parser = argparse.ArgumentParser(prog='operon batch-run', usage=usage(), description=self.help_text())
if not subcommand_args or subcommand_args[ARGV_FIRST_ARGUMENT].lower() in ['-h', '--help', 'help']:
parser.print_help()
sys.exit(EXIT_CMD_SUCCESS)

# Get the pipeline class based on the name
pipeline_name = subcommand_args[ARGV_PIPELINE_NAME]
pipeline_instance = self.get_pipeline_instance(pipeline_name)

if pipeline_instance is not None:
# Parse the pipeline arguments and inject them into the pipeline class
run_args_parser = argparse.ArgumentParser(prog='operon batch-run {}'.format(pipeline_name), add_help=False)
run_args_parser.add_argument('--pipeline-config',
default=os.path.join(self.home_configs, '{}.json'.format(pipeline_name)),
help='Path to a config file to use for this run')
run_args_parser.add_argument('--parsl-config',
help='Path to a JSON file containing a Parsl config')
run_args_parser.add_argument('--logs-dir', default='.', help='Path to a directory to store log files')
run_args_parser.add_argument('--input-matrix',
help=('Tab-separated file with a header and a row of arguments for each '
'sample or unit to be run. Consult the documentation for details '
'on the expected format.'))
run_args_parser.add_argument('--literal-input', action='store_true',
help=('If provided, each line of the input matrix will be intrepreted as '
'if typed directly into the command line.'))
run_args_parser.add_argument('--separate-pools', action='store_true',
help=('If provided, Operon will run each sample or unit with its own '
'pool of resources, essentially like calling a separate Operon '
'instance for each sample or unit.'))
run_args_parser.add_argument('-h', '--help', action='store_true', default=argparse.SUPPRESS,
help='Show help message for run args and pipeline args.')

# Gather arguments for this batch run overall
run_args = vars(run_args_parser.parse_args(subcommand_args[1:]))

# Create a parser for the pipeline args
pipeline_args_parser = argparse.ArgumentParser(add_help=False)
pipeline_instance.arguments(pipeline_args_parser)
batch_pipeline_args = list()

# If -h given to run args, print help message from run and pipeline args and quit
if run_args.get('help'):
sys.stderr.write('For the batch run:\n')
run_args_parser.print_help()
sys.stderr.write('\nFor the pipeline {}:\n'.format(pipeline_name))
pipeline_args_parser.print_help()
sys.exit()

# Make --input-matrix a required argument
if not run_args['input_matrix']:
run_args_parser.print_help()
sys.stderr.write('\nerror: the following arguments are required: --input-matrix\n')
sys.exit()

# Parse the input matrix
with open(run_args['input_matrix']) as input_matrix:
if not run_args['literal_input']:
headers = next(input_matrix).strip().split('\t')
# For each run in this batch run
for line in input_matrix:
positionals, optionals = list(), list()
record = line.strip().split('\t')

# For each argument in this run add to either optional or positional
for i, record_item in enumerate(record):
record_header = headers[i]
if record_header.startswith('positional_'):
positionals.append((int(record_header.split('_')[-1]), record_item))
else:
# Determine whether this is a singleton argument
if record_item.strip().lower() == 'true':
# Include singleton
optionals.append(record_header)
elif record_item.strip().lower() in {'#true', '#false'}:
# Include optional with literal 'true' or 'false' value
optionals.extend([record_header, record_item.strip().strip('#')])
elif record_item.strip().lower() != 'false':
# Include normal optional
optionals.extend([record_header] + record_item.split())
# Note: If value is 'false' then none of these will match, so the optional
# won't be included

# Put positional arguments into positional order
positionals = [p[1] for p in sorted(positionals, key=lambda r: r[0])]

# Parse arguments with pipeline parser
pipeline_args = vars(pipeline_args_parser.parse_args(optionals + positionals))
if 'logs_dir' not in pipeline_args:
pipeline_args['logs_dir'] = run_args['logs_dir']

# Add this run to the batch run
batch_pipeline_args.append(pipeline_args)
else:
for literal_line in input_matrix:
# Parse arguments with pipeline parser
pipeline_args = vars(pipeline_args_parser.parse_args(literal_line.strip().split()))
if 'logs_dir' not in pipeline_args:
pipeline_args['logs_dir'] = run_args['logs_dir']

# Add this run to the batch run
batch_pipeline_args.append(pipeline_args)

# Run the pipeline in batch
pipeline_instance._run_batch_pipeline(
run_args=run_args,
pipeline_config=parse_pipeline_config(run_args['pipeline_config']),
batch_pipeline_args=batch_pipeline_args
)
else:
# If pipeline class doesn't exist, exit immediately
sys.stderr.write('Pipeline {name} does not exist in {home}\n'.format(
name=pipeline_name,
home=self.home_pipelines + '/'
))
sys.exit(EXIT_CMD_SYNTAX_ERROR)
2 changes: 1 addition & 1 deletion operon/_cli/subcommands/run.py
Expand Up @@ -44,7 +44,7 @@ def run(self, subcommand_args):
pipeline_instance.arguments(pipeline_args_parser)
pipeline_args = vars(pipeline_args_parser.parse_args(subcommand_args[1:]))

pipeline_instance._run_pipeline(
pipeline_instance._run_single_pipeline(
pipeline_args=pipeline_args,
pipeline_config=parse_pipeline_config(pipeline_args['pipeline_config'])
)
Expand Down
100 changes: 77 additions & 23 deletions operon/components.py
Expand Up @@ -401,32 +401,83 @@ class ParslPipeline(object):
# Temporary directory to send stream output of un-Redirected apps
_pipeline_run_temp_dir = None

def _run_pipeline(self, pipeline_args, pipeline_config):
def _run_batch_pipeline(self, run_args, pipeline_config, batch_pipeline_args):
# Setup pipeline run
ParslPipeline._setup_run(
logs_dir=run_args['logs_dir'],
pipeline_config=pipeline_config,
pipeline_class=self.__class__
)

for pipeline_args in batch_pipeline_args:
self.pipeline(pipeline_args, pipeline_config)
workflow_graph = ParslPipeline._assemble_graph(_ParslAppBlueprint._blueprints.values())

# Register apps and data with Parsl, get all app futures and temporary files
pipeline_futs, tmp_files = ParslPipeline._register_workflow(
workflow_graph=workflow_graph,
dfk=ParslPipeline._get_dfk(
pipeline_args_parsl_config=run_args['parsl_config'],
pipeline_config_parsl_config=pipeline_config.get('parsl_config'),
pipeline_default_parsl_config=self.parsl_configuration()
)
)

# Monitor the run to completion
ParslPipeline._monitor_run(
pipeline_futs=pipeline_futs,
tmp_files=tmp_files
)

def _run_single_pipeline(self, pipeline_args, pipeline_config):
# Setup pipeline run
ParslPipeline._setup_run(
logs_dir=pipeline_args['logs_dir'],
pipeline_config=pipeline_config,
pipeline_class=self.__class__
)

# Run pipeline to register Software and assemble workflow graph
self.pipeline(pipeline_args, pipeline_config)
workflow_graph = ParslPipeline._assemble_graph(_ParslAppBlueprint._blueprints.values())

# Register apps and data with Parsl, get all app futures and temporary files
pipeline_futs, tmp_files = ParslPipeline._register_workflow(
workflow_graph=workflow_graph,
dfk=ParslPipeline._get_dfk(
pipeline_args_parsl_config=pipeline_args['parsl_config'],
pipeline_config_parsl_config=pipeline_config.get('parsl_config'),
pipeline_default_parsl_config=self.parsl_configuration()
)
)

# Monitor the run to completion
ParslPipeline._monitor_run(
pipeline_futs=pipeline_futs,
tmp_files=tmp_files
)

@staticmethod
def _setup_run(logs_dir, pipeline_config, pipeline_class):
# Ensure the pipeline() method is overridden
if 'pipeline' not in vars(self.__class__):
if 'pipeline' not in vars(pipeline_class):
raise MalformedPipelineError('Pipeline has no method pipeline()')

# Set up logs dir
os.makedirs(pipeline_args['logs_dir'], exist_ok=True)
setup_logger(pipeline_args['logs_dir'])
os.makedirs(logs_dir, exist_ok=True)
setup_logger(logs_dir)

# Set up temp dir
ParslPipeline._pipeline_run_temp_dir = tempfile.TemporaryDirectory(
dir=pipeline_args['logs_dir'],
dir=logs_dir,
suffix='__operon'
)

# Run the pipeline to populate Software instances and construct the workflow graph
# Give pipeline config to Software class
Software._pipeline_config = copy(pipeline_config)
self.pipeline(pipeline_args, pipeline_config)
workflow_graph = self._assemble_graph(_ParslAppBlueprint._blueprints.values())

# Register apps and data with Parsl, get all app futures and temporary files
pipeline_futs, tmp_files = self._register_workflow(
workflow_graph,
self._get_dfk(pipeline_args, pipeline_config)
)

@staticmethod
def _monitor_run(pipeline_futs, tmp_files):
# Record start time
start_time = datetime.now()
logger.info('Started pipeline run\n@operon_start {}'.format(str(start_time)))
Expand Down Expand Up @@ -535,17 +586,18 @@ def running_listener(q, pipeline_futs):
app_name=app_name
))

def _get_dfk(self, pipeline_args, pipeline_config):
@staticmethod
def _get_dfk(pipeline_args_parsl_config, pipeline_config_parsl_config, pipeline_default_parsl_config):
# 1) Config defined at runtime on the command line
if pipeline_args['parsl_config'] is not None:
loaded_config = cycle_config_input_options(pipeline_args['parsl_config'])
if pipeline_args_parsl_config is not None:
loaded_config = cycle_config_input_options(pipeline_args_parsl_config)
if loaded_config is not None:
logger.info('Loaded Parsl config from command line arguments')
return loaded_config

# 2) Config defined for this pipeline in the pipeline configuration
if pipeline_config.get('parsl_config'):
loaded_config = cycle_config_input_options(pipeline_config['parsl_config'])
if pipeline_config_parsl_config:
loaded_config = cycle_config_input_options(pipeline_config_parsl_config)
if loaded_config is not None:
logger.info('Loaded Parsl config from pipeline config')
return loaded_config
Expand All @@ -567,18 +619,19 @@ def _get_dfk(self, pipeline_args, pipeline_config):
logger.error('Bad Parsl config when loading from installation default, trying the next option')

# 4) Config defined by the pipeline developer as a default, if no user config exists
if self.parsl_configuration():
if pipeline_default_parsl_config:
logger.info('Loaded Parsl config from pipeline default')
try:
return direct_config(self.parsl_configuration())
return direct_config(pipeline_default_parsl_config)
except ValueError:
pass # Silently fail, move on to next option

# 5) Config used if all above are absent, always run as a Thread Pool with 2 workers
logger.info('Loaded Parsl config using package default (2 basic threads)')
return dfk_with_config['basic-threads-2']()

def _register_workflow(self, workflow_graph, dfk):
@staticmethod
def _register_workflow(workflow_graph, dfk):
# Instantiate the App Factories
@App('python', dfk)
def _pythonapp(func_, func_args, func_kwargs, **kwargs):
Expand Down Expand Up @@ -676,7 +729,8 @@ def register_app(app_node_id, workflow_graph):

return app_futures, tmp_files

def _assemble_graph(self, blueprints):
@staticmethod
def _assemble_graph(blueprints):
# Initialize a directed graph
digraph = nx.DiGraph()

Expand Down

0 comments on commit b4830c5

Please sign in to comment.