Skip to content

Commit

Permalink
Merge 5f70d22 into ad55eab
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasheinrich committed Jan 11, 2018
2 parents ad55eab + 5f70d22 commit 3685a72
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
*.egg-info
venv
.coverage*
3 changes: 3 additions & 0 deletions packtivity/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import packtivity.utils as utils
import packtivity.syncbackends as syncbackends

from packtivity.typedleafs import TypedLeafs

log = logging.getLogger(__name__)

def prepublish_default(spec,parameters,state):
Expand All @@ -24,6 +26,7 @@ def __call__(self, parameters, state,
asyncbackend = None, asyncwait = False,
waitperiod = 0.01, timeout = 43200 ): #default timeout is 12h

parameters = TypedLeafs(parameters)
if syncbackend and not asyncbackend:
return syncbackend.run(self.spec,parameters,state)
elif asyncbackend:
Expand Down
6 changes: 4 additions & 2 deletions packtivity/asyncbackends.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from .syncbackends import prepublish
from .syncbackends import packconfig

from packtivity.typedleafs import TypedLeafs

log = logging.getLogger(__name__)

class PacktivityProxyBase(object):
Expand Down Expand Up @@ -106,15 +108,15 @@ def proxyname(self):
def details(self):
d = super(ForegroundProxy, self).details() or {}
d.update(
result = self.result,
result = self.result.json(),
success = self.success
)
return d

@classmethod
def fromJSON(cls, data):
return cls(
data['proxydetails']['result'],
TypedLeafs(data['proxydetails']['result']),
data['proxydetails']['success'],
data['proxydetails']
)
Expand Down
2 changes: 1 addition & 1 deletion packtivity/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,4 @@ def checkproxy(jsonfile):
click.secho('successful: {}'.format(successful))
if successful:
result = backend.result(proxy)
click.secho('result: {}'.format(json.dumps(result)))
click.secho('result: {}'.format(json.dumps(result.json())))
8 changes: 4 additions & 4 deletions packtivity/handlers/process_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,30 @@
handlers,process = utils.handler_decorator()

@process('string-interpolated-cmd')
def stringinterp_handler(process_spec,parameters):
def stringinterp_handler(process_spec,parameters, state):
flattened = {k:v if not (type(v)==list) else ' '.join([str(x) for x in v]) for k,v in parameters.items()}
return {
'command':process_spec['cmd'].format(**flattened)
}

@process('interpolated-script-cmd')
def interp_script(process_spec,parameters):
def interp_script(process_spec,parameters, state):
flattened = {k:v if not (type(v)==list) else ' '.join([str(x) for x in v]) for k,v in parameters.items()}
return {
'script':process_spec['script'].format(**flattened),
'interpreter':process_spec['interpreter']
}

@process('manual-instructions-proc')
def manual_proc(process_spec,parameters):
def manual_proc(process_spec,parameters, state):
instructions = process_spec['instructions']
attrs = yaml.safe_dump(parameters,default_flow_style = False)
click.secho(instructions, fg = 'blue')
click.secho(attrs, fg = 'cyan')


@process('test-process')
def test_process(process_spec,parameters):
def test_process(process_spec,parameters, state):
return {
'a': 'complicated',
'job': 'with',
Expand Down
23 changes: 14 additions & 9 deletions packtivity/logutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import importlib
import contextlib
import yaml

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

Expand Down Expand Up @@ -37,19 +38,23 @@ def setup_logging_topic(metadata,state,topic,return_logger = False):
resoures may dry up. that's why we need a specific end point.
The logger can be recreated multiple times
'''

log = logging.getLogger(get_topic_loggername(metadata,topic))
log.setLevel(logging.DEBUG)
log.propagate = False

if not log.handlers:
customhandlers = os.environ.get('PACKTIVITY_LOGGING_HANDLER')
if customhandlers:
module,func = customhandlers.split(':')
m = importlib.import_module(module)
f = getattr(m,func)
f(log,metadata,state,topic)
else:
default_logging_handlers(log,metadata,state,topic)
if yaml.load(os.environ.get('PACKTIVITY_LOGGING_DISABLE','false')):
pass
else:
if not log.handlers:
customhandlers = os.environ.get('PACKTIVITY_LOGGING_HANDLER')
if customhandlers:
module,func = customhandlers.split(':')
m = importlib.import_module(module)
f = getattr(m,func)
f(log,metadata,state,topic)
else:
default_logging_handlers(log,metadata,state,topic)

yield log if return_logger else None

Expand Down
11 changes: 11 additions & 0 deletions packtivity/plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import importlib
import os

def enable_plugins(modules = None):
plugin_modules = modules or []
fromenv = os.environ.get('PACKTIVITY_PLUGINS','')
if fromenv:
plugin_modules += fromenv.split(',')
if plugin_modules:
for plugin in plugin_modules:
importlib.import_module(plugin)
10 changes: 6 additions & 4 deletions packtivity/syncbackends.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import copy

import packtivity.logutils as logutils
from packtivity.typedleafs import TypedLeafs

class packconfig(object):
def __init__(self,**kwargs):
Expand All @@ -27,7 +28,7 @@ def build_job(process,parameters,state,pack_config):
impl = pack_config.get_impl('process',proc_type)
from .handlers.process_handlers import handlers as proc_handlers
handler = proc_handlers[proc_type][impl]
return handler(process,parameters)
return handler(process,parameters, state)

def build_env(environment,parameters,state,pack_config):
'''
Expand All @@ -50,7 +51,7 @@ def run_in_env(environment,job,state,metadata,pack_config):
takes a job and an environment and executes with the state context attached
'''
env_type = environment['environment_type']
impl = pack_config.get_impl('environment',env_type)
impl = pack_config.get_impl('executor',env_type)
from .handlers.execution_handlers import handlers as exec_handlers
handler = exec_handlers[env_type][impl]
return handler(environment,state,job,metadata)
Expand All @@ -60,8 +61,8 @@ def publish(publisher,parameters,state, pack_config):
impl = pack_config.get_impl('publisher',pub_type)
from .handlers.publisher_handlers import handlers as pub_handlers
handler = pub_handlers[pub_type][impl]
return handler(publisher,parameters,state)

pubdata = handler(publisher,parameters,state)
return TypedLeafs(pubdata)

def model_parameters(parameters, state):
if not state: return parameters
Expand All @@ -88,6 +89,7 @@ def prepublish(spec, parameters, state, pack_config):
def run_packtivity(spec, parameters,state,metadata,config):
with logutils.setup_logging_topic(metadata,state,'step',return_logger = True) as log:
try:
parameters = parameters.json()
parameters = model_parameters(parameters, state)
if spec['process'] and spec['environment']:
job = build_job(spec['process'], parameters, state, config)
Expand Down
140 changes: 140 additions & 0 deletions packtivity/typedleafs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import json
import jq
import jsonpointer
import jsonpath_rw
import copy
import base64
import importlib
import collections

class LeafModel(object):
def __init__(self, spec):
self.datamodel = spec or {'keyword': None, 'types': {}}
self._types2str, self._str2types = {}, {}
for name,module_class in self.datamodel['types'].items():
m, c = module_class.split(':')
c = getattr(importlib.import_module(m),c)
self._types2str[c] = name
self._str2types[name] = c
self.keyword = self.datamodel['keyword']
self.leaf_magic = '___leaf___'

def leaf_encode(self,obj):
return self.leaf_magic + base64.b64encode(json.dumps(self.dumper(obj)))

@staticmethod
def leaf_decode(str):
return json.loads(base64.b64decode(str))

def loader(self, spec, idleafs):
if not self.keyword: return spec

found_identifiers = set([self.keyword]).intersection(set(spec.keys()))
found_identifiers = {k:spec[k] for k in found_identifiers}
if not found_identifiers: return spec

for k in found_identifiers.keys():
spec.pop(k)
cl = self._str2types[found_identifiers[self.keyword]]
obj = cl.fromJSON(spec)
if not idleafs:
return obj
return self.leaf_encode(obj)

def dumper(self, obj):
json = obj.json()
if not type(obj)==TypedLeafs:
json[self.keyword] = self._types2str[type(obj)]
return json

class TypedLeafs(collections.MutableMapping):
def __init__(self,data, leafmodel = None, idleafs = False):
self.leafmodel = leafmodel
self._leafmodel = LeafModel(leafmodel)


if isinstance(data, TypedLeafs):
data = data.json()
self._jsonable = data

def __getitem__(self,key):
return self.typed().__getitem__(key)
def __iter__(self):
return self.typed().__iter__()
def __len__(self):
return self.typed().__len__()
def __delitem__(self, key):
self._jsonable.__delitem__(key)
def __setitem__(self, key, value):
data = self._jsonable
data.__setitem__(key, value)
self._jsonable = data

def __normalize(self,idleafs = True):
#wrap in a simple dict, necessary for if data is just a leaf value
data = {'data': self._load_from_string(self._dump_to_string(self._jsonable), typed=False)}
if idleafs:
ptrs = [jsonpointer.JsonPointer.from_parts(x) for x in jq.jq('paths(type=="string" and startswith("{}"))'.format(self._leafmodel.leaf_magic)).transform(data, multiple_output = True)]
for p in ptrs:
p.set(data,self._leafmodel.leaf_decode(p.get(data).replace('___leaf___','')))
self.__jsonable = data['data']

@property
def _jsonable(self):
return self.__jsonable

@_jsonable.setter
def _jsonable(self, value):
pass
self.__jsonable = value
self.__normalize()


@classmethod
def fromJSON(cls, data, deserialization_opts):
return cls(data, deserialization_opts['leafmodel'], deserialization_opts['idleafs'])

def _load_from_string(self,jsonstring, typed = True, idleafs = False):
if typed:
data = json.loads(jsonstring, object_hook = lambda spec: self._leafmodel.loader(spec, idleafs))
return data
else:
return json.loads(jsonstring)

def _dump_to_string(self,data):
return json.dumps(data, default = self._leafmodel.dumper)


### representation methods
def json(self):
return self._jsonable

def typed(self, idleafs = False):
return self._load_from_string(json.dumps(self._jsonable, sort_keys = True), typed=True, idleafs = idleafs)

def copy(self):
return TypedLeafs(copy.deepcopy(self.typed()), self.leafmodel)

def asrefs(self):
data = copy.deepcopy(self.typed())
for p, v in self.leafs():
p.set(data, p)
return data

### QUERY methods
def resolve_ref(self, reference):
return reference.resolve(self.typed())

def jsonpointer(self,pointer_str):
return jsonpointer.JsonPointer(pointer_str).resolve( self.typed() )

def jsonpath(self,jsonpath_expression):
return jsonpath_rw.parse(jsonpath_expression).find( self.typed() )[0].value

def jq(self,jq_program, *args, **kwargs):
return TypedLeafs(jq.jq(jq_program).transform(self.typed(idleafs = True), *args, **kwargs), self.leafmodel, idleafs = True)

def leafs(self):
ptrs = [jsonpointer.JsonPointer.from_parts(parts) for parts in self.jq('leaf_paths', multiple_output = True).typed()]
for p in ptrs:
yield p, p.get(self.typed())
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
'click',
'glob2',
'jsonpointer',
'jsonpath_rw',
'jq',
'psutil',
'yadage-schemas',
'mock',
Expand Down
5 changes: 3 additions & 2 deletions tests/test_asyncbackends.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from packtivity.asyncbackends import MultiProcBackend
from packtivity.typedleafs import TypedLeafs

def test_create_multiproc():
MultiProcBackend(4)

def test_multiproc(tmpdir,basic_localfs_state,localproc_packspec):
basic_localfs_state.ensure()
pars = {'outputfile': '{workdir}/helloworld.txt'}
pars = TypedLeafs({'outputfile': '{workdir}/helloworld.txt'})
backend = MultiProcBackend(2)
proxy = backend.submit(localproc_packspec,pars,basic_localfs_state)
while not backend.ready(proxy):
Expand All @@ -17,7 +18,7 @@ def test_multiproc(tmpdir,basic_localfs_state,localproc_packspec):

def test_multiproc_fail(tmpdir,basic_localfs_state,localproc_pack_fail):
basic_localfs_state.ensure()
pars = {'outputfile': '{workdir}/helloworld.txt'}
pars = TypedLeafs({'outputfile': '{workdir}/helloworld.txt'})

backend = MultiProcBackend(2)
proxy = backend.submit(localproc_pack_fail,pars,basic_localfs_state)
Expand Down

0 comments on commit 3685a72

Please sign in to comment.