Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
NullPool logging messages appear during execution
  • Loading branch information
mxmrlv committed May 24, 2017
1 parent 3d22d36 commit 0c986842d52eca823ab92442dd9d77267e369ae8
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 29 deletions.
@@ -16,7 +16,6 @@
"""
A common context for both workflow and operation
"""

import logging
from contextlib import contextmanager
from functools import partial
@@ -33,6 +33,7 @@ def __init__(self, task_id, actor_id, **kwargs):
self._task_id = task_id
self._actor_id = actor_id
self._thread_local = threading.local()
self._destroy_session = kwargs.pop('destroy_session', False)
logger_level = kwargs.pop('logger_level', None)
super(BaseOperationContext, self).__init__(**kwargs)
self._register_logger(task_id=self.task.id, level=logger_level)
@@ -90,13 +91,21 @@ def serialization_dict(self):
}

@classmethod
def deserialize_from_dict(cls, model_storage=None, resource_storage=None, **kwargs):
def instantiate_from_dict(cls, model_storage=None, resource_storage=None, **kwargs):
if model_storage:
model_storage = aria.application_model_storage(**model_storage)
if resource_storage:
resource_storage = aria.application_resource_storage(**resource_storage)

return cls(model_storage=model_storage, resource_storage=resource_storage, **kwargs)
return cls(model_storage=model_storage,
resource_storage=resource_storage,
destroy_session=True,
**kwargs)

def close(self):
if self._destroy_session:
self.model.log._session.remove()
self.model.log._engine.dispose()


class NodeOperationContext(BaseOperationContext):
@@ -34,22 +34,25 @@ def __init__(self, ex_message, ex_type, ex_traceback):
self.ex_traceback = ex_traceback


def _http_request(socket_url, request, timeout):
response = urllib2.urlopen(
url=socket_url,
data=json.dumps(request),
timeout=timeout)
def _http_request(socket_url, request, method, timeout):
opener = urllib2.build_opener(urllib2.HTTPHandler)
request = urllib2.Request(socket_url, data=json.dumps(request))
request.get_method = lambda: method
response = opener.open(request, timeout=timeout)

if response.code != 200:
raise RuntimeError('Request failed: {0}'.format(response))
return json.loads(response.read())


def _client_request(socket_url, args, timeout):
def _client_request(socket_url, args, timeout, method='POST'):
response = _http_request(
socket_url=socket_url,
request={'args': args},
timeout=timeout)
payload = response['payload']
method=method,
timeout=timeout
)
payload = response.get('payload')
response_type = response.get('type')
if response_type == 'error':
ex_type = payload['type']
@@ -89,7 +92,7 @@ def _process_args(json_prefix, args):
def main(args=None):
args = _parse_args(args)
response = _client_request(
socket_url=args.socket_url,
args.socket_url,
args=_process_args(args.json_arg_prefix, args.args),
timeout=args.timeout)
if args.json_output:
@@ -100,6 +103,5 @@ def main(args=None):
response = str(response)
sys.stdout.write(response)


if __name__ == '__main__':
main()
@@ -42,16 +42,31 @@ def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)):
self._started.get(timeout=5)

def _start_server(self):
proxy = self

class BottleServerAdapter(bottle.ServerAdapter):
proxy = self

def close_session(self):
self.proxy.ctx.model.log._session.remove()

def run(self, app):

class Server(wsgiref.simple_server.WSGIServer):
allow_reuse_address = True
bottle_server = self

def handle_error(self, request, client_address):
pass

def serve_forever(self, poll_interval=0.5):
try:
wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval)
finally:
# Once shutdown is called, we need to close the session.
# If the session is not closed properly, it might raise warnings,
# or even lock the database.
self.bottle_server.close_session()

class Handler(wsgiref.simple_server.WSGIRequestHandler):
def address_string(self):
return self.client_address[0]
@@ -66,8 +81,8 @@ def log_request(*args, **kwargs): # pylint: disable=no-method-argument
app=app,
server_class=Server,
handler_class=Handler)
proxy.server = server
proxy._started.put(True)
self.proxy.server = server
self.proxy._started.put(True)
server.serve_forever(poll_interval=0.1)

def serve():
@@ -96,9 +111,10 @@ def _request_handler(self):
request = bottle.request.body.read() # pylint: disable=no-member
response = self._process(request)
return bottle.LocalResponse(
body=response,
body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder),
status=200,
headers={'content-type': 'application/json'})
headers={'content-type': 'application/json'}
)

def _process(self, request):
try:
@@ -109,10 +125,7 @@ def _process(self, request):
if isinstance(payload, exceptions.ScriptException):
payload = dict(message=str(payload))
result_type = 'stop_operation'
result = json.dumps({
'type': result_type,
'payload': payload
}, cls=modeling.utils.ModelJSONEncoder)
result = {'type': result_type, 'payload': payload}
except Exception as e:
traceback_out = StringIO.StringIO()
traceback.print_exc(file=traceback_out)
@@ -121,10 +134,8 @@ def _process(self, request):
'message': str(e),
'traceback': traceback_out.getvalue()
}
result = json.dumps({
'type': 'error',
'payload': payload
})
result = {'type': 'error', 'payload': payload}

return result

def __enter__(self):
@@ -373,7 +373,7 @@ def _main():
# See docstring of `remove_mutable_association_listener` for further details
modeling_types.remove_mutable_association_listener()
try:
ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
ctx = context_dict['context_cls'].instantiate_from_dict(**context_dict['context'])
except BaseException as e:
messenger.failed(exception=e, tracked_changes=None, new_instances=None)
return
@@ -136,7 +136,7 @@ def stub_args(arg1, arg2, arg3='arg3', arg4='arg4', *args, **kwargs):
kwargs=kwargs)

@pytest.fixture
def ctx(self):
def ctx(self, mocker):
class MockCtx(object):
pass
ctx = MockCtx()
@@ -160,11 +160,13 @@ class MockCtx(object):
ctx.stub_args = self.stub_args
ctx.stub_attr = self.StubAttribute()
ctx.node = self.NodeAttribute(properties)
ctx.model = mocker.MagicMock()
return ctx

@pytest.fixture
def server(self, ctx):
result = ctx_proxy.server.CtxProxy(ctx)
result._close_session = lambda *args, **kwargs: {}
yield result
result.close()

@@ -69,7 +69,7 @@ def __getattr__(self, item):
return None

@classmethod
def deserialize_from_dict(cls, **kwargs):
def instantiate_from_dict(cls, **kwargs):
if kwargs:
return cls(storage=aria.application_model_storage(**kwargs))
else:

0 comments on commit 0c98684

Please sign in to comment.