Skip to content

Commit

Permalink
Merge branch 'master' into resetfix
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasheinrich committed Mar 23, 2018
2 parents f784631 + d7e58b7 commit 41658e2
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 18 deletions.
13 changes: 10 additions & 3 deletions packtivity/asyncbackends.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ def json(self):
class ExternalAsyncProxy(PacktivityProxyBase):
def __init__(self, jobproxy, spec, statedata, pardata):
self.jobproxy = jobproxy
self.result = None
self.spec = spec
self.statedata = statedata
self.pardata = pardata
self._details = None
self._details = jobproxy

class ExternalAsyncBackend(object):
def __init__(self, job_backend, deserialization_opts = None, config = None):
Expand All @@ -62,11 +63,16 @@ def submit(self, spec, parameters, state, metadata = None):
return ExternalAsyncProxy(jobproxy, spec, state.json(), parameters.json())

def result(self,resultproxy):
if resultproxy.result is not None:
return resultproxy.result

state = load_state(resultproxy.statedata, self.deserialization_opts)
parameters = TypedLeafs(resultproxy.pardata, state.datamodel)
pubdata = publish(resultproxy.spec['publisher'], parameters,state,self.config)
log.info('publishing data: %s',pubdata)
pubdata = finalize_outputs(pubdata)
return pubdata
resultproxy.result = pubdata
return resultproxy.result

def ready(self,resultproxy):
return self.job_backend.ready(resultproxy.jobproxy)
Expand All @@ -82,7 +88,7 @@ def __init__(self, config = None):
self.pool = multiprocessing.Pool(1)
self.config = packconfig(**config) if config else packconfig()

def submit(self, job, env,state, metadata):
def submit(self, job, env, state, metadata):
nullary = functools.partial(run_in_env,
job = job,
environment = env,
Expand Down Expand Up @@ -245,6 +251,7 @@ def fail_info(self,resultproxy):
default_celeryapp = Celery('defaultapp')
default_celeryapp.conf.update(
task_serializer = 'pickle',
result_serializer = 'pickle',
accept_content = ['pickle','json'],
broker_url = os.environ.get('PACKTIVITY_CELERY_REDIS_BROKER','redis://localhost:6379'),
result_backend = os.environ.get('PACKTIVITY_CELERY_REDIS_BROKER','redis://localhost:6379'),
Expand Down
2 changes: 2 additions & 0 deletions packtivity/handlers/environment_handlers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import packtivity.utils as utils
import jq
import copy
handlers,environment = utils.handler_decorator()

@environment('docker-encapsulated')
def docker(environment,parameters,state):
environment = copy.deepcopy(environment)

jsonpars = parameters.json()
for p,v in parameters.leafs():
Expand Down
8 changes: 4 additions & 4 deletions packtivity/handlers/execution_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def docker_execution_cmdline(state,environment,log,metadata,stdin,cmd_argv):
command = quoted_string
)

def run_docker_with_script(environment,job,log):
def script_argv(environment,job,log):
script = job['script']
interpreter = job['interpreter']

Expand All @@ -184,7 +184,7 @@ def run_docker_with_script(environment,job,log):
in_docker_cmd = envmod+indocker
return ['sh', '-c', in_docker_cmd], script

def run_docker_with_oneliner(environment,job,log):
def command_argv(environment,job,log):
log.debug('''\n\
--------------
running one liner in container.
Expand Down Expand Up @@ -282,10 +282,10 @@ def docker_enc_handler(environment,state,job,metadata):
with logutils.setup_logging_topic(metadata,state,'step',return_logger = True) as log:
if 'command' in job:
stdin = False
container_argv, container_stdin = run_docker_with_oneliner(environment,job,log)
container_argv, container_stdin = command_argv(environment,job,log)
elif 'script' in job:
stdin = True
container_argv, container_stdin = run_docker_with_script(environment,job,log)
container_argv, container_stdin = script_argv(environment,job,log)
else:
raise RuntimeError('do not know yet how to run this...')

Expand Down
31 changes: 23 additions & 8 deletions packtivity/typedleafs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,28 @@ def __init__(self, spec):
else:
raise RuntimeError('not sure how to interpret type def %s',class_def)
self.keyword = self.datamodel['keyword']
self.leaf_magic = '___leaf___'
self.canonical_leaf_magic = 'b64json://'
lits = self.datamodel.get('literals')

def leaf_encode(self,obj):
return self.leaf_magic + base64.b64encode(json.dumps(self.dumper(obj)).encode('utf-8')).decode('utf-8')
self.magics = [self.canonical_leaf_magic]

if lits:
m, f = lits['parser'].split(':')
self.litparser = getattr(importlib.import_module(m),f)
self.magics += lits['magics']

@staticmethod
def leaf_decode(str):
return json.loads(base64.b64decode(str).decode('utf-8'))
def leaf_encode(self,obj):
return self.canonical_leaf_magic + base64.b64encode(json.dumps(self.dumper(obj)).encode('utf-8')).decode('utf-8')

def leaf_decode(self,encoded):
for m in self.magics:
if encoded.startswith(m):
if m == self.canonical_leaf_magic:
magic_replaced = encoded.replace(self.canonical_leaf_magic,'')
return json.loads(base64.b64decode(magic_replaced).decode('utf-8'))
else:
return self.litparser(encoded)
raise RuntimeError('cannot decode {} '.format(encoded))

def loader(self, spec, idleafs):
if not self.keyword: return spec
Expand Down Expand Up @@ -84,9 +98,10 @@ def __normalize(self,idleafs = True):
#wrap in a simple dict, necessary for if data is just a leaf value
data = {'data': self._load_from_string(self._dump_to_string(self._jsonable), typed=False)}
if idleafs:
ptrs = [jsonpointer.JsonPointer.from_parts(x) for x in jq.jq('paths(type=="string" and startswith("{}"))'.format(self._leafmodel.leaf_magic)).transform(data, multiple_output = True)]
magicexpr = ' or '.join(['startswith("{}")'.format(m) for m in self._leafmodel.magics])
ptrs = [jsonpointer.JsonPointer.from_parts(x) for x in jq.jq('paths(type=="string" and ({}))'.format(magicexpr)).transform(data, multiple_output = True)]
for p in ptrs:
p.set(data,self._leafmodel.leaf_decode(p.get(data).replace('___leaf___','')))
p.set(data,self._leafmodel.leaf_decode(p.get(data)))
self.__jsonable = data['data']

@property
Expand Down
6 changes: 3 additions & 3 deletions tests/test_executions.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from packtivity.handlers.publisher_handlers import handlers
from packtivity.typedleafs import TypedLeafs

from packtivity.handlers.execution_handlers import run_docker_with_oneliner, docker_execution_cmdline
from packtivity.handlers.execution_handlers import command_argv, docker_execution_cmdline
import logging

def test_docker_cvmfs(tmpdir,basic_localfs_state, docker_env_resources, monkeypatch):
state = basic_localfs_state
environment = docker_env_resources.spec['environment']
log = logging.getLogger('test')
job = {'command': 'echo hello world'}
container_argv, stdin = run_docker_with_oneliner(environment,job,log)
container_argv, stdin = command_argv(environment,job,log)

assert container_argv == ['sh','-c','echo hello world']
assert stdin == None
Expand Down Expand Up @@ -47,7 +47,7 @@ def test_docker_auth(tmpdir,basic_localfs_state, docker_env_resources, monkeypat
environment = docker_env_resources.spec['environment']
log = logging.getLogger('test')
job = {'command': 'echo hello world'}
container_argv, stdin = run_docker_with_oneliner(environment,job,log)
container_argv, stdin = command_argv(environment,job,log)

assert container_argv == ['sh','-c','echo hello world']
assert stdin == None
Expand Down

0 comments on commit 41658e2

Please sign in to comment.