Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connect/eaas/runner/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
BACKGROUND_TASK_MAX_EXECUTION_TIME = 300
INTERACTIVE_TASK_MAX_EXECUTION_TIME = 120
SCHEDULED_TASK_MAX_EXECUTION_TIME = 60 * 60 * 12
TRANSFORMATION_TASK_MAX_EXECUTION_TIME = 60 * 60 * 4
TRANSFORMATION_TASK_MAX_EXECUTION_TIME = 60 * 60 * 16
ROW_TRANSFORMATION_TASK_MAX_EXECUTION_TIME = 90
RESULT_SENDER_MAX_RETRIES = 5
RESULT_SENDER_WAIT_GRACE_SECONDS = 90
Expand Down
6 changes: 3 additions & 3 deletions connect/eaas/runner/handlers/anvil.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
from connect.client import (
ConnectClient,
)
from connect.eaas.core.logging import (
RequestLogger,
)
from connect.eaas.runner.config import (
ConfigHelper,
)
from connect.eaas.runner.handlers.base import (
ApplicationHandlerBase,
)
from connect.eaas.runner.logging import (
RequestLogger,
)


logger = logging.getLogger(__name__)
Expand Down
4 changes: 3 additions & 1 deletion connect/eaas/runner/handlers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
)
from connect.eaas.core.logging import (
ExtensionLogHandler,
RequestLogger,
)
from connect.eaas.runner.helpers import (
iter_entry_points,
)
from connect.eaas.runner.logging import (
RequestLogger,
)


class ApplicationHandlerBase(ABC):
Expand Down
15 changes: 15 additions & 0 deletions connect/eaas/runner/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from connect.eaas.core.logging import (
RequestLogger as _RequestLogger,
)


class RequestLogger(_RequestLogger):
def log_request(self, method, url, kwargs):
if not self.logger.isEnabledFor(self.level):
return
super().log_request(method, url, kwargs)

def log_response(self, response):
if not self.logger.isEnabledFor(self.level):
return
super().log_response(response)
31 changes: 23 additions & 8 deletions connect/eaas/runner/managers/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@
from connect.eaas.core.enums import (
ResultType,
)
from connect.eaas.core.logging import (
RequestLogger,
)
from connect.eaas.core.proto import (
Task,
TaskOutput,
Expand All @@ -54,6 +51,9 @@
TRANSFORMATION_TASK_MAX_PARALLEL_LINES,
UPLOAD_CHUNK_SIZE,
)
from connect.eaas.runner.logging import (
RequestLogger,
)
from connect.eaas.runner.managers.base import (
TasksManagerBase,
)
Expand Down Expand Up @@ -163,7 +163,7 @@ async def build_response(self, task_data, future):
except Exception as e:
cause = (
str(e) if not isinstance(e, asyncio.TimeoutError)
else 'timed out after {timeout} s'
else f'timed out after {timeout} s'
)
self.log_exception(task_data, e)
await self._fail_task(task_data, cause)
Expand Down Expand Up @@ -251,7 +251,10 @@ async def process_transformation(self, task_data, tfn_request, method):
await client.conversations[task_data.input.object_id].messages.create(
payload={
'type': 'message',
'text': f'Transformation request processing failed: {str(e) or "timed out"}',
'text': (
'Transformation request processing failed: '
f'{self.format_exception_message(e)}'
),
},
)
return TransformationResponse.fail(output=str(e))
Expand Down Expand Up @@ -381,7 +384,6 @@ async def process_rows(self, read_queue, result_store, method, tfn_request, logg
async def transform_row(self, method, row_idx, row, row_styles):
try:
if ROW_DELETED_MARKER in list(row.values()):
# await result_store.put((row_idx, RowTransformationResponse.delete()))
return RowTransformationResponse.delete()
kwargs = {}
if 'row_styles' in inspect.signature(method).parameters:
Expand All @@ -394,19 +396,24 @@ async def transform_row(self, method, row_idx, row, row_styles):
self.executor,
functools.partial(method, row, **kwargs),
)
timeout = self.config.get_timeout('row_transformation')
response = await asyncio.wait_for(
awaitable,
timeout=self.config.get_timeout('row_transformation'),
timeout=timeout,
)
if not isinstance(response, RowTransformationResponse):
raise RowTransformationError(f'invalid row tranformation response: {response}')
if response.status == ResultType.FAIL:
raise RowTransformationError(f'row transformation failed: {response.output}')
return response
except Exception as e:
cause = (
str(e) if not isinstance(e, asyncio.TimeoutError)
else f'timed out after {timeout} s'
)
raise RowTransformationError(
f'Error applying transformation function {method.__name__} '
f'to row #{row_idx}: {str(e)}.',
f'to row #{row_idx}: {cause}.',
) from e

def write_excel(
Expand Down Expand Up @@ -525,3 +532,11 @@ async def chunks_iterator(): # pragma: no cover
payload={'files': {'output': {'id': media_file_id}}},
)
await client(ns).requests[task_data.input.object_id]('process').post()

def format_exception_message(self, e):
if isinstance(e, asyncio.CancelledError):
return 'cancelled'
elif isinstance(e, asyncio.TimeoutError):
return 'timed out'
else:
return str(e) or repr(e)
8 changes: 2 additions & 6 deletions connect/eaas/runner/workers/anvil.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
import logging
import signal

from devtools import (
pformat,
)

from connect.eaas.core.proto import (
Message,
MessageType,
Expand Down Expand Up @@ -54,15 +50,15 @@ def get_setup_request(self):
runner_version=get_version(),
),
)
logger.debug(f'Sending setup request: {pformat(msg)}')
logger.debug(f'Sending setup request: {self.prettify(msg)}')
return msg.dict()

async def stopping(self):
pass

async def process_message(self, data):
message = Message.deserialize(data)
logger.debug(f'Received message: {pformat(message)}')
logger.debug(f'Received message: {self.prettify(message)}')
if message.message_type == MessageType.SETUP_RESPONSE:
await self.process_setup_response(message.data)
self.handler.start()
Expand Down
7 changes: 6 additions & 1 deletion connect/eaas/runner/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def stop(self):
async def send_shutdown(self):
try:
msg = Message(version=2, message_type=MessageType.SHUTDOWN)
logger.debug(f'Sending message: {pformat(msg)}')
logger.debug(f'Sending message: {self.prettify(msg)}')
await self.send(msg.serialize())
except ConnectionClosedError:
pass
Expand Down Expand Up @@ -331,6 +331,11 @@ async def stopping(self):
async def process_message(self, data):
raise NotImplementedError()

def prettify(self, msg):
if logger.isEnabledFor(logging.DEBUG):
return pformat(msg)
return '<...>'

def _backoff_shutdown(self, _):
if not self.run_event.is_set():
logger.info(f'{self} exiting, stop backoff loop')
Expand Down
10 changes: 3 additions & 7 deletions connect/eaas/runner/workers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
import logging
import signal

from devtools import (
pformat,
)

from connect.eaas.core.proto import (
Message,
MessageType,
Expand Down Expand Up @@ -103,15 +99,15 @@ def get_setup_request(self):
runner_version=get_version(),
),
)
logger.debug(f'Sending setup request: {pformat(msg)}')
logger.debug(f'Sending setup request: {self.prettify(msg)}')
return msg.dict()

async def process_message(self, data):
"""
Process a message received from the websocket server.
"""
message = Message.deserialize(data)
logger.debug(f'Received message: {pformat(message)}')
logger.debug(f'Received message: {self.prettify(message)}')
if message.message_type == MessageType.SETUP_RESPONSE:
await self.process_setup_response(message.data)
elif message.message_type == MessageType.TASK:
Expand Down Expand Up @@ -156,7 +152,7 @@ async def result_sender(self): # noqa: CCR001
data=result,
)
await self.ensure_connection()
logger.debug(f'Sending message: {pformat(message)}')
logger.debug(f'Sending message: {self.prettify(message)}')
await self.send(message.serialize())
logger.info(f'Result for task {result.options.task_id} has been sent.')
break
Expand Down
10 changes: 3 additions & 7 deletions connect/eaas/runner/workers/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
import logging
import signal

from devtools import (
pformat,
)

from connect.eaas.core.proto import (
Message,
MessageType,
Expand Down Expand Up @@ -70,7 +66,7 @@ def get_setup_request(self):
runner_version=get_version(),
),
)
logger.debug(f'Sending setup request: {pformat(msg)}')
logger.debug(f'Sending setup request: {self.prettify(msg)}')
return msg.dict()

async def stopping(self):
Expand All @@ -92,7 +88,7 @@ async def stopping(self):

async def process_message(self, data):
message = Message.deserialize(data)
logger.debug(f'Received message: {pformat(message)}')
logger.debug(f'Received message: {self.prettify(message)}')
if message.message_type == MessageType.SETUP_RESPONSE:
await self.process_setup_response(message.data)
elif message.message_type == MessageType.TASK:
Expand Down Expand Up @@ -132,7 +128,7 @@ async def result_sender(self): # noqa: CCR001
data=result,
)
await self.ensure_connection()
logger.debug(f'Sending message: {pformat(message)}')
logger.debug(f'Sending message: {self.prettify(message)}')
await self.send(message.serialize())
logger.info(
f'Result for transformation task {result.options.task_id} has been sent.',
Expand Down
9 changes: 3 additions & 6 deletions connect/eaas/runner/workers/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
import signal

import httpx
from devtools import (
pformat,
)

from connect.eaas.core.proto import (
HttpRequest,
Expand Down Expand Up @@ -63,15 +60,15 @@ def get_setup_request(self):
proxied_connect_api=self.handler.proxied_connect_api,
),
)
logger.debug(f'Sending setup request: {pformat(msg)}')
logger.debug(f'Sending setup request: {self.prettify(msg)}')
return msg.dict()

async def stopping(self):
pass

async def process_message(self, data):
message = Message.deserialize(data)
logger.debug(f'Received message: {pformat(message)}')
logger.debug(f'Received message: {self.prettify(message)}')
if message.message_type == MessageType.SETUP_RESPONSE:
await self.process_setup_response(message.data)
elif message.message_type == MessageType.WEB_TASK:
Expand Down Expand Up @@ -184,7 +181,7 @@ def build_response(self, task, status, headers, body):
message_type=MessageType.WEB_TASK,
data=task_response,
)
logger.debug(f'Sending message: {pformat(message)}')
logger.debug(f'Sending message: {self.prettify(message)}')
return message.serialize()


Expand Down
20 changes: 20 additions & 0 deletions tests/managers/test_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,3 +739,23 @@ def test_generate_output_row_invalid_status(mocker):
manager.generate_output_row(mocker.MagicMock(), column_names, response)

assert str(cv.value) == 'Invalid row transformation response status: reschedule.'


def test_format_exception_msg(mocker):
manager = TransformationTasksManager(mocker.MagicMock(), mocker.MagicMock(), mocker.MagicMock())

assert manager.format_exception_message(
asyncio.CancelledError(asyncio.Future()),
) == 'cancelled'

assert manager.format_exception_message(
asyncio.TimeoutError(20),
) == 'timed out'

assert manager.format_exception_message(
Exception('hello'),
) == 'hello'

assert manager.format_exception_message(
Exception(),
) == 'Exception()'
44 changes: 44 additions & 0 deletions tests/test_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging

from connect.eaas.core.logging import (
RequestLogger as RLBase,
)
from connect.eaas.runner.logging import (
RequestLogger,
)


def test_log_request(mocker):
mocked_log_req = mocker.patch.object(RLBase, 'log_request')
logger = logging.getLogger('test')
logger.setLevel(logging.INFO)

rl = RequestLogger(logger, logging.DEBUG)

rl.log_request('method', 'url', {'a': 'b'})

mocked_log_req.assert_not_called()

logger.setLevel(logging.DEBUG)

rl.log_request('method', 'url', {'a': 'b'})

mocked_log_req.assert_called_once()


def test_log_response(mocker):
mocked_log_req = mocker.patch.object(RLBase, 'log_response')
logger = logging.getLogger('test')
logger.setLevel(logging.INFO)

rl = RequestLogger(logger, logging.DEBUG)

rl.log_response('response')

mocked_log_req.assert_not_called()

logger.setLevel(logging.DEBUG)

rl.log_response('response')

mocked_log_req.assert_called_once()
16 changes: 16 additions & 0 deletions tests/workers/test_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -1261,3 +1261,19 @@ def test_url(self):
}
worker.stop()
await task


def test_prettify(mocker):
logger = logging.getLogger('connect.eaas.runner')
worker = WebWorker(
mocker.MagicMock(),
mocker.MagicMock(),
mocker.MagicMock(),
mocker.MagicMock(),
)

logger.setLevel(logging.DEBUG)
assert worker.prettify('message') == "'message'"

logger.setLevel(logging.INFO)
assert worker.prettify('message') == '<...>'