From 0c986842d52eca823ab92442dd9d77267e369ae8 Mon Sep 17 00:00:00 2001 From: max-orlov Date: Mon, 22 May 2017 18:28:12 +0300 Subject: [PATCH] NullPool logging messages appear during execution --- aria/orchestrator/context/common.py | 1 - aria/orchestrator/context/operation.py | 13 ++++++- .../execution_plugin/ctx_proxy/client.py | 22 ++++++----- .../execution_plugin/ctx_proxy/server.py | 37 ++++++++++++------- .../workflows/executor/process.py | 2 +- .../execution_plugin/test_ctx_proxy_server.py | 4 +- .../workflows/executor/__init__.py | 2 +- 7 files changed, 52 insertions(+), 29 deletions(-) diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 0854a272..c98e0265 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -16,7 +16,6 @@ """ A common context for both workflow and operation """ - import logging from contextlib import contextmanager from functools import partial diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 68a02aa3..0ce790ff 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -33,6 +33,7 @@ def __init__(self, task_id, actor_id, **kwargs): self._task_id = task_id self._actor_id = actor_id self._thread_local = threading.local() + self._destroy_session = kwargs.pop('destroy_session', False) logger_level = kwargs.pop('logger_level', None) super(BaseOperationContext, self).__init__(**kwargs) self._register_logger(task_id=self.task.id, level=logger_level) @@ -90,13 +91,21 @@ def serialization_dict(self): } @classmethod - def deserialize_from_dict(cls, model_storage=None, resource_storage=None, **kwargs): + def instantiate_from_dict(cls, model_storage=None, resource_storage=None, **kwargs): if model_storage: model_storage = aria.application_model_storage(**model_storage) if resource_storage: resource_storage = aria.application_resource_storage(**resource_storage) - return cls(model_storage=model_storage, resource_storage=resource_storage, **kwargs) + return cls(model_storage=model_storage, + resource_storage=resource_storage, + destroy_session=True, + **kwargs) + + def close(self): + if self._destroy_session: + self.model.log._session.remove() + self.model.log._engine.dispose() class NodeOperationContext(BaseOperationContext): diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/client.py b/aria/orchestrator/execution_plugin/ctx_proxy/client.py index d965a5ea..f7f56aa8 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/client.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/client.py @@ -34,22 +34,25 @@ def __init__(self, ex_message, ex_type, ex_traceback): self.ex_traceback = ex_traceback -def _http_request(socket_url, request, timeout): - response = urllib2.urlopen( - url=socket_url, - data=json.dumps(request), - timeout=timeout) +def _http_request(socket_url, request, method, timeout): + opener = urllib2.build_opener(urllib2.HTTPHandler) + request = urllib2.Request(socket_url, data=json.dumps(request)) + request.get_method = lambda: method + response = opener.open(request, timeout=timeout) + if response.code != 200: raise RuntimeError('Request failed: {0}'.format(response)) return json.loads(response.read()) -def _client_request(socket_url, args, timeout): +def _client_request(socket_url, args, timeout, method='POST'): response = _http_request( socket_url=socket_url, request={'args': args}, - timeout=timeout) - payload = response['payload'] + method=method, + timeout=timeout + ) + payload = response.get('payload') response_type = response.get('type') if response_type == 'error': ex_type = payload['type'] @@ -89,7 +92,7 @@ def _process_args(json_prefix, args): def main(args=None): args = _parse_args(args) response = _client_request( - socket_url=args.socket_url, + args.socket_url, args=_process_args(args.json_arg_prefix, args.args), timeout=args.timeout) if args.json_output: @@ -100,6 +103,5 @@ def main(args=None): response = str(response) sys.stdout.write(response) - if __name__ == '__main__': main() diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py index 52a53121..1ce0e08b 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -42,16 +42,31 @@ def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)): self._started.get(timeout=5) def _start_server(self): - proxy = self class BottleServerAdapter(bottle.ServerAdapter): + proxy = self + + def close_session(self): + self.proxy.ctx.model.log._session.remove() + def run(self, app): + class Server(wsgiref.simple_server.WSGIServer): allow_reuse_address = True + bottle_server = self def handle_error(self, request, client_address): pass + def serve_forever(self, poll_interval=0.5): + try: + wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval) + finally: + # Once shutdown is called, we need to close the session. + # If the session is not closed properly, it might raise warnings, + # or even lock the database. + self.bottle_server.close_session() + class Handler(wsgiref.simple_server.WSGIRequestHandler): def address_string(self): return self.client_address[0] @@ -66,8 +81,8 @@ def log_request(*args, **kwargs): # pylint: disable=no-method-argument app=app, server_class=Server, handler_class=Handler) - proxy.server = server - proxy._started.put(True) + self.proxy.server = server + self.proxy._started.put(True) server.serve_forever(poll_interval=0.1) def serve(): @@ -96,9 +111,10 @@ def _request_handler(self): request = bottle.request.body.read() # pylint: disable=no-member response = self._process(request) return bottle.LocalResponse( - body=response, + body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder), status=200, - headers={'content-type': 'application/json'}) + headers={'content-type': 'application/json'} + ) def _process(self, request): try: @@ -109,10 +125,7 @@ def _process(self, request): if isinstance(payload, exceptions.ScriptException): payload = dict(message=str(payload)) result_type = 'stop_operation' - result = json.dumps({ - 'type': result_type, - 'payload': payload - }, cls=modeling.utils.ModelJSONEncoder) + result = {'type': result_type, 'payload': payload} except Exception as e: traceback_out = StringIO.StringIO() traceback.print_exc(file=traceback_out) @@ -121,10 +134,8 @@ def _process(self, request): 'message': str(e), 'traceback': traceback_out.getvalue() } - result = json.dumps({ - 'type': 'error', - 'payload': payload - }) + result = {'type': 'error', 'payload': payload} + return result def __enter__(self): diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 824c4e14..da6bbb28 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -373,7 +373,7 @@ def _main(): # See docstring of `remove_mutable_association_listener` for further details modeling_types.remove_mutable_association_listener() try: - ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) + ctx = context_dict['context_cls'].instantiate_from_dict(**context_dict['context']) except BaseException as e: messenger.failed(exception=e, tracked_changes=None, new_instances=None) return diff --git a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py index 98ceff95..1b19fd97 100644 --- a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py +++ b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py @@ -136,7 +136,7 @@ def stub_args(arg1, arg2, arg3='arg3', arg4='arg4', *args, **kwargs): kwargs=kwargs) @pytest.fixture - def ctx(self): + def ctx(self, mocker): class MockCtx(object): pass ctx = MockCtx() @@ -160,11 +160,13 @@ class MockCtx(object): ctx.stub_args = self.stub_args ctx.stub_attr = self.StubAttribute() ctx.node = self.NodeAttribute(properties) + ctx.model = mocker.MagicMock() return ctx @pytest.fixture def server(self, ctx): result = ctx_proxy.server.CtxProxy(ctx) + result._close_session = lambda *args, **kwargs: {} yield result result.close() diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index 8ad8edb7..375c44e5 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -69,7 +69,7 @@ def __getattr__(self, item): return None @classmethod - def deserialize_from_dict(cls, **kwargs): + def instantiate_from_dict(cls, **kwargs): if kwargs: return cls(storage=aria.application_model_storage(**kwargs)) else: