Skip to content

Commit

Permalink
Merge 49fed3c into 55b44d6
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasheinrich committed Dec 6, 2017
2 parents 55b44d6 + 49fed3c commit c7f890e
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 23 deletions.
11 changes: 1 addition & 10 deletions packtivity/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@

log = logging.getLogger(__name__)

def finalize_input(jsondata,state):
for path,value in utils.leaf_iterator(jsondata):
actualval = state.contextualize_data(value)
path.set(jsondata,actualval)
return jsondata

def getinit_data(initfiles,parameters):
'''
get initial data from both a list of files and a list of 'pname=pvalue'
Expand All @@ -38,7 +32,6 @@ def getinit_data(initfiles,parameters):
@click.option('--parameter', '-p', multiple=True)
@click.option('-r', '--read', multiple=True, default = [])
@click.option('-w', '--write', multiple=True, default = [os.curdir])
@click.option('--contextualize/--no-contextualize', default = True)
@click.option('-s','--state', default = '')
@click.option('-t','--toplevel', default = os.getcwd())
@click.option('-c','--schemasource', default = yadageschemas.schemadir)
Expand All @@ -49,7 +42,7 @@ def getinit_data(initfiles,parameters):
@click.option('-x','--proxyfile',default = 'proxy.json')
@click.argument('spec')
@click.argument('parfiles', nargs = -1)
def runcli(spec,parfiles,state,parameter,read,write,toplevel,schemasource,asyncwait,contextualize,validate,verbosity,backend,proxyfile):
def runcli(spec,parfiles,state,parameter,read,write,toplevel,schemasource,asyncwait,validate,verbosity,backend,proxyfile):
logging.basicConfig(level = getattr(logging,verbosity))

spec = utils.load_packtivity(spec,toplevel,schemasource,validate)
Expand All @@ -62,8 +55,6 @@ def runcli(spec,parfiles,state,parameter,read,write,toplevel,schemasource,asyncw
state = LocalFSState(state['readwrite'],state['readonly'])
state.ensure()

if contextualize:
parameters = finalize_input(parameters,state)

is_sync, backend = bkutils.backend_from_string(backend)
backend_kwargs = {
Expand Down
18 changes: 17 additions & 1 deletion packtivity/syncbackends.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import packtivity.logutils as logutils
from packtivity.handlers import enable_plugins
from .utils import leaf_iterator

enable_plugins()

class packconfig(object):
Expand Down Expand Up @@ -64,29 +66,43 @@ def publish(publisher,parameters,state, pack_config):
handler = pub_handlers[pub_type][impl]
return handler(publisher,parameters,state)


def contextualize_parameters(parameters, state):
if not state: return parameters

contextualized_parameters = copy.deepcopy(parameters)
for leaf_pointer, leaf_value in leaf_iterator(parameters):
leaf_pointer.set(contextualized_parameters,state.contextualize_data(leaf_value))

return contextualized_parameters

def prepublish(spec, parameters, state, pack_config):
'''
attempts to prepublish output data, returns None if not possible
'''
parameters = contextualize_parameters(parameters, state)
pub = spec['publisher']

if pub['publisher_type'] in ['frompar-pub','constant-pub']:
return publish(pub,parameters,state,pack_config)
if pub['publisher_type'] in ['interpolated-pub', 'fromparjq-pub']:
from .statecontexts.posixfs_context import LocalFSState
if not state:
return publish(pub,parameters,state,pack_config)
if type(state) == LocalFSState:
if pub['glob'] == False or len(state.readwrite)==0:
if pub['glob'] == False or len(state.readwrite)==0:
return publish(pub,parameters,state,pack_config)
return None

def run_packtivity(spec, parameters,state,metadata,config):
with logutils.setup_logging_topic(metadata,state,'step',return_logger = True) as log:
try:
parameters = contextualize_parameters(parameters, state)
if spec['process'] and spec['environment']:
job = build_job(spec['process'], parameters, state, config)
env = build_env(spec['environment'], parameters, state, config)
run_in_env(env,job,state,metadata,config)

pubdata = publish(spec['publisher'], parameters,state, config)
log.info('publishing data: %s',pubdata)
return pubdata
Expand Down
5 changes: 2 additions & 3 deletions tests/test_asyncbackends.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def test_create_multiproc():

def test_multiproc(tmpdir,basic_localfs_state,localproc_packspec):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
backend = MultiProcBackend(2)
proxy = backend.submit(localproc_packspec,pars,basic_localfs_state)
while not backend.ready(proxy):
Expand All @@ -17,12 +17,11 @@ def test_multiproc(tmpdir,basic_localfs_state,localproc_packspec):

def test_multiproc_fail(tmpdir,basic_localfs_state,localproc_pack_fail):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}

backend = MultiProcBackend(2)
proxy = backend.submit(localproc_pack_fail,pars,basic_localfs_state)
while not backend.ready(proxy):
pass
assert backend.successful(proxy) == False
backend.fail_info(proxy)

18 changes: 9 additions & 9 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,58 @@

def test_pack_call_local(tmpdir,basic_localfs_state,localproc_pack):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
localproc_pack(parameters = pars, state = basic_localfs_state)
assert tmpdir.join('helloworld.txt').check()

def test_pack_call_docker(tmpdir,basic_localfs_state,dockeproc_pack):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
dockeproc_pack(parameters = pars, state = basic_localfs_state)
assert tmpdir.join('helloworld.txt').check()

def test_pack_call_local_fail(tmpdir,basic_localfs_state,localproc_pack_fail,default_async):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
with pytest.raises(RuntimeError):
localproc_pack_fail(parameters = pars, state = basic_localfs_state)
assert tmpdir.join('helloworld.txt').check()

def test_pack_call_docker_fail(tmpdir,basic_localfs_state,docker_pack_fail,default_async):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
with pytest.raises(RuntimeError):
docker_pack_fail(parameters = pars, state = basic_localfs_state)

def test_pack_call_docker_script_fail(tmpdir,basic_localfs_state,docker_script_pack_fail,default_async):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
with pytest.raises(RuntimeError):
docker_script_pack_fail(parameters = pars, state = basic_localfs_state)

def test_pack_call_docker_script(tmpdir,basic_localfs_state,dockeproc_script_pack):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
dockeproc_script_pack(parameters = pars, state = basic_localfs_state)
assert tmpdir.join('helloworld.txt').check()

def test_pack_call_docker_async(tmpdir,basic_localfs_state,dockeproc_script_pack,default_async):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
dockeproc_script_pack(parameters = pars, state = basic_localfs_state, asyncbackend = default_async, asyncwait = True)
assert tmpdir.join('helloworld.txt').check()

def test_pack_call_docker_script_async(tmpdir,basic_localfs_state,dockeproc_script_pack,default_async):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
proxy = dockeproc_script_pack(parameters = pars, state = basic_localfs_state, asyncbackend = default_async)
while not default_async.ready(proxy): pass
default_async.result(proxy)
assert tmpdir.join('helloworld.txt').check()

def test_pack_prepublish(tmpdir,basic_localfs_state,localproc_pack,default_sync):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}

assert default_sync.prepublish(localproc_pack.spec,pars,basic_localfs_state) == {
'output': str(tmpdir.join('helloworld.txt'))
Expand Down

0 comments on commit c7f890e

Please sign in to comment.