Skip to content
Browse files

Merge pull request #408 from kyleam/run-subjobs

run: Parallel subjobs
  • Loading branch information
yarikoptic committed Aug 9, 2019
2 parents 23e59a5 + a823a4e commit 0b5bc61638f2c3155068c1133d78cf64112bc00c
@@ -85,6 +85,7 @@ install:
- sudo sed -i -e 's/^Defaults.*secure_path.*$//' /etc/sudoers
# for SVN tests (SVNRepoShim._ls_files_command())
- sudo apt-get install sqlite3
- sudo apt-get install moreutils # for concurrent jobs with local orchestrator
- travis_retry sudo eatmydata apt-get install singularity-container
- if [ ! -z "${INSTALL_DATALAD:-}" ]; then tools/ci/install_datalad; fi
- if [ ! -z "${INSTALL_CONDOR:-}" ]; then tools/ci/install_condor; fi
@@ -11,7 +11,9 @@

from argparse import REMAINDER
import collections
import glob
import logging
import itertools
import textwrap
import yaml

@@ -64,6 +66,94 @@ def update(d, u):
return initial

def _parse_batch_params(params):
"""Transform batch parameter strings into lists of tuples.
params : list of str
The string should have the form "key=val1,val2,val3".
A generator that, for each key, yields a list of key-value tuple pairs.
def maybe_glob(x):
return glob.glob(x) if glob.has_magic(x) else [x]

seen_keys = set()
for param in params:
if "=" not in param:
raise ValueError(
"param value should be formatted as 'key=value,...'")
key, value_str = param.split("=", maxsplit=1)
if key in seen_keys:
raise ValueError("Key '{}' was given more than once".format(key))
yield [(key, v)
for v_unexpanded in value_str.split(",")
for v in maybe_glob(v_unexpanded)]

def _combine_batch_params(params):
"""Transform batch parameter strings into records.
params : list of str
The string should have the form "key=val1,val2,val3".
A generator that yields a record, computing the product from the values.
>>> from pprint import pprint
>>> params = ["k0=val1,val2,val3", "k1=val4,val5"]
>>> pprint(list(_combine_batch_params(params)))
[{'k0': 'val1', 'k1': 'val4'},
{'k0': 'val1', 'k1': 'val5'},
{'k0': 'val2', 'k1': 'val4'},
{'k0': 'val2', 'k1': 'val5'},
{'k0': 'val3', 'k1': 'val4'},
{'k0': 'val3', 'k1': 'val5'}]
if not params:
# Note: If we want to support pairing the ith elements rather than taking
# the product, we could add a parameter that signals to use zip() rather
# than product(). If we do that, we'll also want to check that the values
# for each key are the same length, probably in _parse_batch_params().
for i in itertools.product(*_parse_batch_params(params)):
yield dict(i)

def _resolve_batch_parameters(spec_file, params):
"""Determine batch parameters based on user input.
spec_file : str or None
Name of YAML file the defines records of parameters.
params : list of str or None
The string should have the form "key=val1,val2,val3".
List of records or None if neither `spec_file` or `params` is specified.
if spec_file and params:
raise ValueError(
"Batch parameters cannot be provided with a batch spec")

resolved = None
if spec_file:
with open(spec_file) as pf:
resolved = yaml.safe_load(pf)
elif params:
resolved = list(_combine_batch_params(params))
return resolved

JOB_PARAMETERS = collections.OrderedDict(
("root_directory", Orchestrator.root_directory),
@@ -79,6 +169,18 @@ def update(d, u):
"""Name of orchestrator. The orchestrator performs pre- and
post-command steps like setting up the directory for command execution
and storing the results."""),
"""YAML file that defines a series of records with parameters for
commands. A command will be constructed for each record, with record
values available in the command as well as the inputs and outputs as
"""Define batch parameters with 'KEY=val1,val2,...'. Different keys
can be specified by giving multiple values, in which case the product
of the values are taken. For example, 'subj=mei,satsuki' and 'day=1,2'
would expand to four records, pairing each subj with each day. Values
can be a glob pattern to match against the current working
("inputs, outputs",
"""Input and output files (list) to the command."""),
@@ -129,6 +231,21 @@ class Run(Interface):
doc=(JOB_PARAMETERS["orchestrator"] +
"[CMD: Use --list to see available orchestrators CMD]")),
args=("--batch-spec", "--bs"),
doc=(JOB_PARAMETERS["batch_spec"] +
" See [CMD: --batch-parameter CMD][PY: `batch_parameters` PY]"
" for an alternative method for simple combinations.")),
args=("--batch-parameter", "--bp"),
doc=(JOB_PARAMETERS["batch_parameters"] +
" See [CMD: --batch-spec CMD][PY: `batch_spec` PY]"
" for specifying more complex records.")),
args=("--job-spec", "--js"),
@@ -141,7 +258,7 @@ class Run(Interface):
args=("-p", "--job-parameter"),
args=("--job-parameter", "--jp"),
# TODO: Use nargs=+ like create's --backend-parameters? I'd rather
# use 'append' there.
@@ -187,6 +304,7 @@ class Run(Interface):
def __call__(command=None, message=None,
resref=None, resref_type="auto",
list_=None, submitter=None, orchestrator=None,
batch_spec=None, batch_parameters=None,
job_specs=None, job_parameters=None,
inputs=None, outputs=None,
@@ -231,6 +349,8 @@ def fmt(d):
"message": message,
"submitter": submitter,
"orchestrator": orchestrator,
"batch_spec": batch_spec,
"batch_parameters": batch_parameters,
"inputs": inputs,
"outputs": outputs,
@@ -243,6 +363,9 @@ def fmt(d):
spec = _combine_job_specs(_load_specs(job_specs or []) +
[job_parameters, cli_spec])

spec["batch_parameters"] = _resolve_batch_parameters(
spec.get("batch_spec"), spec.get("batch_parameters"))

# Treat "command" as a special case because it's a list and the
# template expects a string.
if not command and "command_str" in spec:
@@ -13,12 +13,15 @@
from unittest.mock import patch
import os
import os.path as op
import time

import pytest

from reproman.api import jobs
from reproman.api import run
from import _combine_batch_params
from import _combine_job_specs
from import _resolve_batch_parameters
from reproman.utils import chpwd
from reproman.utils import swallow_logs
from reproman.utils import swallow_outputs
@@ -27,6 +30,7 @@
from reproman.tests import fixtures
from reproman.tests.utils import create_tree

lgr = logging.getLogger("reproman.interface.tests.test_run")

# Tests that do not require a resource, registry, or orchestrator.

@@ -74,6 +78,87 @@ def test_combine_specs(specs, expected):
assert _combine_job_specs(specs) == expected

[([], []),
[{"a": "1"}, {"a": "2"}]),
(["a=1,2", "b=3"],
[{"a": "1", "b": "3"},
{"a": "2", "b": "3"}]),
(["a=1,2", "b=3,4"],
[{"a": "1", "b": "3"},
{"a": "1", "b": "4"},
{"a": "2", "b": "3"},
{"a": "2", "b": "4"}]),
[{"a": "1"},
{"a": "2=3"}]),
(["a= 1 spaces are preserved , 2"],
[{"a": " 1 spaces are preserved "},
{"a": " 2"}])],
ids=["empty", "one", "two, one varying", "two varying", "= in value",
def test_combine_batch_params(params, expected):
actual = list(sorted(_combine_batch_params(params),
key=lambda d: (d.get("a"), d.get("b"))))
assert len(actual) == len(expected)
assert actual == expected

def test_combine_batch_params_glob(tmpdir):
tmpdir = str(tmpdir)
create_tree(tmpdir, {"aaa": "a",
"subdir": {"b": "b", "c": "c"}})
with chpwd(tmpdir):
res = sorted(_combine_batch_params(["foo=a*,subdir/*,other"]),
key=lambda d: d["foo"])
assert list(res) == [
{"foo": "aaa"},
{"foo": "other"},
{"foo": "subdir/b"},
{"foo": "subdir/c"}]

def test_combine_batch_params_repeat_key():
with pytest.raises(ValueError):
list(_combine_batch_params(["a=1", "a=2"]))

def test_combine_batch_params_no_equal():
with pytest.raises(ValueError):

def test_run_batch_spec_and_params():
with pytest.raises(ValueError):
batch_spec="anything", batch_parameters="anything")

[([], ""),
- a: '1'
- a: '2'"""),
(["a=1,2", "b=3"],
- a: '1'
b: '3'
- a: '2'
b: '3'""")],
ids=["empty", "one", "two, one varying"])
def test_resolve_batch_params_eq(tmpdir, params, spec):
fname = op.join(str(tmpdir), "spec.yml")
with open(fname, "w") as fh:
from_param_str = _resolve_batch_parameters(spec_file=None, params=params)
from_spec = _resolve_batch_parameters(spec_file=fname, params=None)
assert from_param_str == from_spec

# Tests that require `context`.

@@ -156,6 +241,27 @@ def test_run_resource_specification(context):
assert "fromcli" in str(exc)

def try_fetch(fetch_fn, ntimes=5):
"""Helper to test asynchronous fetch.
def try_():
with swallow_logs(new_level=logging.INFO) as log:
return "Not fetching incomplete job" not in log.out

for i in range(1, ntimes + 1):
succeeded = try_()
if succeeded:
sleep_for = (2 ** i) / 2"Job is incomplete. Sleeping for %s seconds",
raise RuntimeError("All fetch attempts failed")

def test_run_and_fetch(context):
path = context["directory"]
run = context["run_fn"]
@@ -170,15 +276,12 @@ def test_run_and_fetch(context):


with swallow_logs(new_level=logging.INFO) as log:
with swallow_outputs() as output:
jobs(queries=[], status=True)
assert "myshell" in output.out
assert len(registry.find_job_files()) == 1
jobs(queries=[], action="fetch", all_=True)
assert len(registry.find_job_files()) == 0
jobs(queries=[], status=True)
assert "No jobs" in log.out
with swallow_outputs() as output:
jobs(queries=[], status=True)
assert "myshell" in output.out
assert len(registry.find_job_files()) == 1
try_fetch(lambda: jobs(queries=[], action="fetch", all_=True))
assert len(registry.find_job_files()) == 0

assert op.exists(op.join(path, "ok"))

@@ -212,7 +315,7 @@ def test_jobs_auto_fetch_with_query(context):
assert len(jobfiles) == 1
jobid = list(jobfiles.keys())[0]
with swallow_outputs():
try_fetch(lambda: jobs(queries=[jobid[3:]]))
assert len(registry.find_job_files()) == 0
assert op.exists(op.join(path, "ok"))

0 comments on commit 0b5bc61

Please sign in to comment.
You can’t perform that action at this time.