Skip to content

Commit

Permalink
persist external backend proxies
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasheinrich committed Mar 24, 2018
1 parent 2c4d2f1 commit d5ca560
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
39 changes: 31 additions & 8 deletions packtivity/asyncbackends.py
Expand Up @@ -3,6 +3,7 @@
import sys
import traceback
import os
import json
import logging
import yaml

Expand Down Expand Up @@ -39,13 +40,34 @@ def json(self):
}

class ExternalAsyncProxy(PacktivityProxyBase):
def __init__(self, jobproxy, spec, statedata, pardata):
def __init__(self, jobproxy, spec, statedata, pardata, resultdata = None):
self.jobproxy = jobproxy
self.result = None
self.resultdata = resultdata
self.spec = spec
self.statedata = statedata
self.pardata = pardata
self._details = jobproxy

def details(self):
try:
dumped_prox = json.dumps(self.jobproxy)
except TypeError:
dumped_prox = None
return {
'resultdata': self.resultdata,
'jobproxy': dumped_prox,
'spec': self.spec,
'statedata': self.statedata,
'pardata': self.pardata,
}

def proxyname(self):
return 'ExternalAsyncProxy'

@classmethod
def fromJSON(cls, data):
if not data['jobproxy']:
raise RuntimeError('not external backend proxy saved during serialization')
return cls(**data['proxydetails'])

class ExternalAsyncBackend(object):
def __init__(self, job_backend, deserialization_opts = None, config = None):
Expand All @@ -63,16 +85,17 @@ def submit(self, spec, parameters, state, metadata = None):
return ExternalAsyncProxy(jobproxy, spec, state.json(), parameters.json())

def result(self,resultproxy):
if resultproxy.result is not None:
return resultproxy.result

state = load_state(resultproxy.statedata, self.deserialization_opts)

if resultproxy.resultdata is not None:
return TypedLeafs(resultproxy.resultdata, state.datamodel)

parameters = TypedLeafs(resultproxy.pardata, state.datamodel)
pubdata = publish(resultproxy.spec['publisher'], parameters,state,self.config)
log.info('publishing data: %s',pubdata)
pubdata = finalize_outputs(pubdata)
resultproxy.result = pubdata
return resultproxy.result
resultproxy.resultdata = pubdata.json()
return pubdata

def ready(self,resultproxy):
return self.job_backend.ready(resultproxy.jobproxy)
Expand Down
4 changes: 4 additions & 0 deletions packtivity/backendutils.py
Expand Up @@ -30,6 +30,10 @@ def load_proxy(jsondata, deserialization_opts = None, best_effort_backend = True
if best_effort_backend:
_, backend = backend_from_string('fromenv')

if jsondata['proxyname'] == 'ExternalAsyncProxy':
from .asyncbackends import ExternalAsyncProxy
proxy = ExternalAsyncProxy.fromJSON(jsondata)

if jsondata['proxyname'] == 'CeleryProxy':
from .asyncbackends import CeleryProxy
proxy = CeleryProxy.fromJSON(jsondata)
Expand Down

0 comments on commit d5ca560

Please sign in to comment.