From a9622d26b4f204557ea871a7332a106b5b567d49 Mon Sep 17 00:00:00 2001 From: Francesco Faraone Date: Wed, 4 Oct 2023 20:02:59 +0200 Subject: [PATCH] LITE-28771 increase transformation task timeout to 16h; fix logging mem consumption; improve exception handling' --- connect/eaas/runner/constants.py | 2 +- connect/eaas/runner/handlers/anvil.py | 6 +-- connect/eaas/runner/handlers/base.py | 4 +- connect/eaas/runner/logging.py | 15 +++++++ .../eaas/runner/managers/transformation.py | 31 +++++++++---- connect/eaas/runner/workers/anvil.py | 8 +--- connect/eaas/runner/workers/base.py | 7 ++- connect/eaas/runner/workers/events.py | 10 ++--- .../eaas/runner/workers/transformations.py | 10 ++--- connect/eaas/runner/workers/web.py | 9 ++-- tests/managers/test_transformation.py | 20 +++++++++ tests/test_logging.py | 44 +++++++++++++++++++ tests/workers/test_web.py | 16 +++++++ 13 files changed, 142 insertions(+), 40 deletions(-) create mode 100644 connect/eaas/runner/logging.py create mode 100644 tests/test_logging.py diff --git a/connect/eaas/runner/constants.py b/connect/eaas/runner/constants.py index 3869dae..6a4060e 100644 --- a/connect/eaas/runner/constants.py +++ b/connect/eaas/runner/constants.py @@ -98,7 +98,7 @@ BACKGROUND_TASK_MAX_EXECUTION_TIME = 300 INTERACTIVE_TASK_MAX_EXECUTION_TIME = 120 SCHEDULED_TASK_MAX_EXECUTION_TIME = 60 * 60 * 12 -TRANSFORMATION_TASK_MAX_EXECUTION_TIME = 60 * 60 * 4 +TRANSFORMATION_TASK_MAX_EXECUTION_TIME = 60 * 60 * 16 ROW_TRANSFORMATION_TASK_MAX_EXECUTION_TIME = 90 RESULT_SENDER_MAX_RETRIES = 5 RESULT_SENDER_WAIT_GRACE_SECONDS = 90 diff --git a/connect/eaas/runner/handlers/anvil.py b/connect/eaas/runner/handlers/anvil.py index 91533d0..aa20bec 100644 --- a/connect/eaas/runner/handlers/anvil.py +++ b/connect/eaas/runner/handlers/anvil.py @@ -10,15 +10,15 @@ from connect.client import ( ConnectClient, ) -from connect.eaas.core.logging import ( - RequestLogger, -) from connect.eaas.runner.config import ( ConfigHelper, ) from connect.eaas.runner.handlers.base import ( ApplicationHandlerBase, ) +from connect.eaas.runner.logging import ( + RequestLogger, +) logger = logging.getLogger(__name__) diff --git a/connect/eaas/runner/handlers/base.py b/connect/eaas/runner/handlers/base.py index c88fde9..09bed05 100644 --- a/connect/eaas/runner/handlers/base.py +++ b/connect/eaas/runner/handlers/base.py @@ -14,11 +14,13 @@ ) from connect.eaas.core.logging import ( ExtensionLogHandler, - RequestLogger, ) from connect.eaas.runner.helpers import ( iter_entry_points, ) +from connect.eaas.runner.logging import ( + RequestLogger, +) class ApplicationHandlerBase(ABC): diff --git a/connect/eaas/runner/logging.py b/connect/eaas/runner/logging.py new file mode 100644 index 0000000..4359bdd --- /dev/null +++ b/connect/eaas/runner/logging.py @@ -0,0 +1,15 @@ +from connect.eaas.core.logging import ( + RequestLogger as _RequestLogger, +) + + +class RequestLogger(_RequestLogger): + def log_request(self, method, url, kwargs): + if not self.logger.isEnabledFor(self.level): + return + super().log_request(method, url, kwargs) + + def log_response(self, response): + if not self.logger.isEnabledFor(self.level): + return + super().log_response(response) diff --git a/connect/eaas/runner/managers/transformation.py b/connect/eaas/runner/managers/transformation.py index a7d84fd..fac1ce8 100644 --- a/connect/eaas/runner/managers/transformation.py +++ b/connect/eaas/runner/managers/transformation.py @@ -35,9 +35,6 @@ from connect.eaas.core.enums import ( ResultType, ) -from connect.eaas.core.logging import ( - RequestLogger, -) from connect.eaas.core.proto import ( Task, TaskOutput, @@ -54,6 +51,9 @@ TRANSFORMATION_TASK_MAX_PARALLEL_LINES, UPLOAD_CHUNK_SIZE, ) +from connect.eaas.runner.logging import ( + RequestLogger, +) from connect.eaas.runner.managers.base import ( TasksManagerBase, ) @@ -163,7 +163,7 @@ async def build_response(self, task_data, future): except Exception as e: cause = ( str(e) if not isinstance(e, asyncio.TimeoutError) - else 'timed out after {timeout} s' + else f'timed out after {timeout} s' ) self.log_exception(task_data, e) await self._fail_task(task_data, cause) @@ -251,7 +251,10 @@ async def process_transformation(self, task_data, tfn_request, method): await client.conversations[task_data.input.object_id].messages.create( payload={ 'type': 'message', - 'text': f'Transformation request processing failed: {str(e) or "timed out"}', + 'text': ( + 'Transformation request processing failed: ' + f'{self.format_exception_message(e)}' + ), }, ) return TransformationResponse.fail(output=str(e)) @@ -381,7 +384,6 @@ async def process_rows(self, read_queue, result_store, method, tfn_request, logg async def transform_row(self, method, row_idx, row, row_styles): try: if ROW_DELETED_MARKER in list(row.values()): - # await result_store.put((row_idx, RowTransformationResponse.delete())) return RowTransformationResponse.delete() kwargs = {} if 'row_styles' in inspect.signature(method).parameters: @@ -394,9 +396,10 @@ async def transform_row(self, method, row_idx, row, row_styles): self.executor, functools.partial(method, row, **kwargs), ) + timeout = self.config.get_timeout('row_transformation') response = await asyncio.wait_for( awaitable, - timeout=self.config.get_timeout('row_transformation'), + timeout=timeout, ) if not isinstance(response, RowTransformationResponse): raise RowTransformationError(f'invalid row tranformation response: {response}') @@ -404,9 +407,13 @@ async def transform_row(self, method, row_idx, row, row_styles): raise RowTransformationError(f'row transformation failed: {response.output}') return response except Exception as e: + cause = ( + str(e) if not isinstance(e, asyncio.TimeoutError) + else f'timed out after {timeout} s' + ) raise RowTransformationError( f'Error applying transformation function {method.__name__} ' - f'to row #{row_idx}: {str(e)}.', + f'to row #{row_idx}: {cause}.', ) from e def write_excel( @@ -525,3 +532,11 @@ async def chunks_iterator(): # pragma: no cover payload={'files': {'output': {'id': media_file_id}}}, ) await client(ns).requests[task_data.input.object_id]('process').post() + + def format_exception_message(self, e): + if isinstance(e, asyncio.CancelledError): + return 'cancelled' + elif isinstance(e, asyncio.TimeoutError): + return 'timed out' + else: + return str(e) or repr(e) diff --git a/connect/eaas/runner/workers/anvil.py b/connect/eaas/runner/workers/anvil.py index 3b2706e..847dfb3 100644 --- a/connect/eaas/runner/workers/anvil.py +++ b/connect/eaas/runner/workers/anvil.py @@ -7,10 +7,6 @@ import logging import signal -from devtools import ( - pformat, -) - from connect.eaas.core.proto import ( Message, MessageType, @@ -54,7 +50,7 @@ def get_setup_request(self): runner_version=get_version(), ), ) - logger.debug(f'Sending setup request: {pformat(msg)}') + logger.debug(f'Sending setup request: {self.prettify(msg)}') return msg.dict() async def stopping(self): @@ -62,7 +58,7 @@ async def stopping(self): async def process_message(self, data): message = Message.deserialize(data) - logger.debug(f'Received message: {pformat(message)}') + logger.debug(f'Received message: {self.prettify(message)}') if message.message_type == MessageType.SETUP_RESPONSE: await self.process_setup_response(message.data) self.handler.start() diff --git a/connect/eaas/runner/workers/base.py b/connect/eaas/runner/workers/base.py index 5981ce0..8240ae6 100644 --- a/connect/eaas/runner/workers/base.py +++ b/connect/eaas/runner/workers/base.py @@ -303,7 +303,7 @@ def stop(self): async def send_shutdown(self): try: msg = Message(version=2, message_type=MessageType.SHUTDOWN) - logger.debug(f'Sending message: {pformat(msg)}') + logger.debug(f'Sending message: {self.prettify(msg)}') await self.send(msg.serialize()) except ConnectionClosedError: pass @@ -331,6 +331,11 @@ async def stopping(self): async def process_message(self, data): raise NotImplementedError() + def prettify(self, msg): + if logger.isEnabledFor(logging.DEBUG): + return pformat(msg) + return '<...>' + def _backoff_shutdown(self, _): if not self.run_event.is_set(): logger.info(f'{self} exiting, stop backoff loop') diff --git a/connect/eaas/runner/workers/events.py b/connect/eaas/runner/workers/events.py index 71170ae..5bce263 100644 --- a/connect/eaas/runner/workers/events.py +++ b/connect/eaas/runner/workers/events.py @@ -7,10 +7,6 @@ import logging import signal -from devtools import ( - pformat, -) - from connect.eaas.core.proto import ( Message, MessageType, @@ -103,7 +99,7 @@ def get_setup_request(self): runner_version=get_version(), ), ) - logger.debug(f'Sending setup request: {pformat(msg)}') + logger.debug(f'Sending setup request: {self.prettify(msg)}') return msg.dict() async def process_message(self, data): @@ -111,7 +107,7 @@ async def process_message(self, data): Process a message received from the websocket server. """ message = Message.deserialize(data) - logger.debug(f'Received message: {pformat(message)}') + logger.debug(f'Received message: {self.prettify(message)}') if message.message_type == MessageType.SETUP_RESPONSE: await self.process_setup_response(message.data) elif message.message_type == MessageType.TASK: @@ -156,7 +152,7 @@ async def result_sender(self): # noqa: CCR001 data=result, ) await self.ensure_connection() - logger.debug(f'Sending message: {pformat(message)}') + logger.debug(f'Sending message: {self.prettify(message)}') await self.send(message.serialize()) logger.info(f'Result for task {result.options.task_id} has been sent.') break diff --git a/connect/eaas/runner/workers/transformations.py b/connect/eaas/runner/workers/transformations.py index 97a731a..e0cfd33 100644 --- a/connect/eaas/runner/workers/transformations.py +++ b/connect/eaas/runner/workers/transformations.py @@ -7,10 +7,6 @@ import logging import signal -from devtools import ( - pformat, -) - from connect.eaas.core.proto import ( Message, MessageType, @@ -70,7 +66,7 @@ def get_setup_request(self): runner_version=get_version(), ), ) - logger.debug(f'Sending setup request: {pformat(msg)}') + logger.debug(f'Sending setup request: {self.prettify(msg)}') return msg.dict() async def stopping(self): @@ -92,7 +88,7 @@ async def stopping(self): async def process_message(self, data): message = Message.deserialize(data) - logger.debug(f'Received message: {pformat(message)}') + logger.debug(f'Received message: {self.prettify(message)}') if message.message_type == MessageType.SETUP_RESPONSE: await self.process_setup_response(message.data) elif message.message_type == MessageType.TASK: @@ -132,7 +128,7 @@ async def result_sender(self): # noqa: CCR001 data=result, ) await self.ensure_connection() - logger.debug(f'Sending message: {pformat(message)}') + logger.debug(f'Sending message: {self.prettify(message)}') await self.send(message.serialize()) logger.info( f'Result for transformation task {result.options.task_id} has been sent.', diff --git a/connect/eaas/runner/workers/web.py b/connect/eaas/runner/workers/web.py index 51752e1..e1183cc 100644 --- a/connect/eaas/runner/workers/web.py +++ b/connect/eaas/runner/workers/web.py @@ -11,9 +11,6 @@ import signal import httpx -from devtools import ( - pformat, -) from connect.eaas.core.proto import ( HttpRequest, @@ -63,7 +60,7 @@ def get_setup_request(self): proxied_connect_api=self.handler.proxied_connect_api, ), ) - logger.debug(f'Sending setup request: {pformat(msg)}') + logger.debug(f'Sending setup request: {self.prettify(msg)}') return msg.dict() async def stopping(self): @@ -71,7 +68,7 @@ async def stopping(self): async def process_message(self, data): message = Message.deserialize(data) - logger.debug(f'Received message: {pformat(message)}') + logger.debug(f'Received message: {self.prettify(message)}') if message.message_type == MessageType.SETUP_RESPONSE: await self.process_setup_response(message.data) elif message.message_type == MessageType.WEB_TASK: @@ -184,7 +181,7 @@ def build_response(self, task, status, headers, body): message_type=MessageType.WEB_TASK, data=task_response, ) - logger.debug(f'Sending message: {pformat(message)}') + logger.debug(f'Sending message: {self.prettify(message)}') return message.serialize() diff --git a/tests/managers/test_transformation.py b/tests/managers/test_transformation.py index 9560223..130a30a 100644 --- a/tests/managers/test_transformation.py +++ b/tests/managers/test_transformation.py @@ -739,3 +739,23 @@ def test_generate_output_row_invalid_status(mocker): manager.generate_output_row(mocker.MagicMock(), column_names, response) assert str(cv.value) == 'Invalid row transformation response status: reschedule.' + + +def test_format_exception_msg(mocker): + manager = TransformationTasksManager(mocker.MagicMock(), mocker.MagicMock(), mocker.MagicMock()) + + assert manager.format_exception_message( + asyncio.CancelledError(asyncio.Future()), + ) == 'cancelled' + + assert manager.format_exception_message( + asyncio.TimeoutError(20), + ) == 'timed out' + + assert manager.format_exception_message( + Exception('hello'), + ) == 'hello' + + assert manager.format_exception_message( + Exception(), + ) == 'Exception()' diff --git a/tests/test_logging.py b/tests/test_logging.py new file mode 100644 index 0000000..1017f8c --- /dev/null +++ b/tests/test_logging.py @@ -0,0 +1,44 @@ +import logging + +from connect.eaas.core.logging import ( + RequestLogger as RLBase, +) +from connect.eaas.runner.logging import ( + RequestLogger, +) + + +def test_log_request(mocker): + mocked_log_req = mocker.patch.object(RLBase, 'log_request') + logger = logging.getLogger('test') + logger.setLevel(logging.INFO) + + rl = RequestLogger(logger, logging.DEBUG) + + rl.log_request('method', 'url', {'a': 'b'}) + + mocked_log_req.assert_not_called() + + logger.setLevel(logging.DEBUG) + + rl.log_request('method', 'url', {'a': 'b'}) + + mocked_log_req.assert_called_once() + + +def test_log_response(mocker): + mocked_log_req = mocker.patch.object(RLBase, 'log_response') + logger = logging.getLogger('test') + logger.setLevel(logging.INFO) + + rl = RequestLogger(logger, logging.DEBUG) + + rl.log_response('response') + + mocked_log_req.assert_not_called() + + logger.setLevel(logging.DEBUG) + + rl.log_response('response') + + mocked_log_req.assert_called_once() diff --git a/tests/workers/test_web.py b/tests/workers/test_web.py index c1ee224..a268bf2 100644 --- a/tests/workers/test_web.py +++ b/tests/workers/test_web.py @@ -1261,3 +1261,19 @@ def test_url(self): } worker.stop() await task + + +def test_prettify(mocker): + logger = logging.getLogger('connect.eaas.runner') + worker = WebWorker( + mocker.MagicMock(), + mocker.MagicMock(), + mocker.MagicMock(), + mocker.MagicMock(), + ) + + logger.setLevel(logging.DEBUG) + assert worker.prettify('message') == "'message'" + + logger.setLevel(logging.INFO) + assert worker.prettify('message') == '<...>'