From 638e2c9b5446235824c618c2e38a94bdddca9953 Mon Sep 17 00:00:00 2001 From: Francesco Faraone Date: Thu, 12 Aug 2021 13:32:38 +0200 Subject: [PATCH] LITE-19830 add extra metadata to logging extras --- connect/eaas/dataclasses.py | 70 +++++++-------- connect/eaas/manager.py | 13 +-- connect/eaas/worker.py | 42 +++++++-- tests/test_dataclasses.py | 113 ++++++++++++++++-------- tests/test_manager.py | 21 ++--- tests/test_worker.py | 170 +++++++++++++++++++++--------------- 6 files changed, 259 insertions(+), 170 deletions(-) diff --git a/connect/eaas/dataclasses.py b/connect/eaas/dataclasses.py index 10f876d..8c4299a 100644 --- a/connect/eaas/dataclasses.py +++ b/connect/eaas/dataclasses.py @@ -48,58 +48,52 @@ class TaskPayload: task_category: str task_type: str object_id: str - result: str = None + result: Optional[str] = None data: Any = None countdown: int = 0 - output: str = None - correlation_id: str = None - reply_to: str = None - - def to_json(self): - return dataclasses.asdict(self) + output: Optional[str] = None + correlation_id: Optional[str] = None + reply_to: Optional[str] = None @dataclasses.dataclass class ConfigurationPayload: - configuration: dict = None - logging_api_key: str = None - environment_type: str = None - log_level: str = None - runner_log_level: str = None - - def to_json(self): - return dataclasses.asdict(self) + configuration: Optional[dict] = None + logging_api_key: Optional[str] = None + environment_type: Optional[str] = None + account_id: Optional[str] = None + account_name: Optional[str] = None + log_level: Optional[str] = None + runner_log_level: Optional[str] = None @dataclasses.dataclass class CapabilitiesPayload: capabilities: dict - readme_url: str = None - changelog_url: str = None + readme_url: Optional[str] = None + changelog_url: Optional[str] = None - def to_json(self): - return dataclasses.asdict(self) - -@dataclasses.dataclass(init=False) +@dataclasses.dataclass class Message: message_type: str data: Optional[Union[CapabilitiesPayload, ConfigurationPayload, TaskPayload]] = None - def __init__(self, message_type=None, data=None): - self.message_type = message_type - if isinstance(data, dict): - if self.message_type == MessageType.CONFIGURATION: - self.data = ConfigurationPayload(**data) - elif self.message_type == MessageType.TASK: - self.data = TaskPayload(**data) - elif self.message_type == MessageType.CAPABILITIES: - self.data = CapabilitiesPayload(**data) - else: - self.data = data - - def to_json(self): - payload = {'message_type': self.message_type} - if self.data: - payload['data'] = dataclasses.asdict(self.data) - return payload + +def from_dict(cls, data): + field_names = set(f.name for f in dataclasses.fields(cls)) + return cls(**{k: v for k, v in data.items() if k in field_names}) + + +def parse_message(payload): + message_type = payload['message_type'] + if message_type == MessageType.CONFIGURATION: + data = from_dict(ConfigurationPayload, payload.get('data')) + elif message_type == MessageType.TASK: + data = from_dict(TaskPayload, payload.get('data')) + elif message_type == MessageType.CAPABILITIES: + data = from_dict(CapabilitiesPayload, payload.get('data')) + else: + data = payload.get('data') + + return Message(message_type=message_type, data=data) diff --git a/connect/eaas/manager.py b/connect/eaas/manager.py index bf43f44..150a911 100644 --- a/connect/eaas/manager.py +++ b/connect/eaas/manager.py @@ -4,6 +4,7 @@ # Copyright (c) 2021 Ingram Micro. All Rights Reserved. # import asyncio +import dataclasses import inspect import logging import traceback @@ -90,7 +91,7 @@ async def submit_task(self, data): object_id = data.object_id task_type = data.task_type method_name = TASK_TYPE_EXT_METHOD_MAP[task_type] - extension = self.worker.get_extension() + extension = self.worker.get_extension(data.task_id) method = getattr(extension, method_name) logger.debug(f'invoke {method_name}') self.running_tasks += 1 @@ -175,7 +176,7 @@ async def result_sender(self): # noqa: CCR001 message_type=MessageType.TASK, data=result, ) - await self.worker.send(message.to_json()) + await self.worker.send(dataclasses.asdict(message)) logger.info(f'Result for task {result.task_id} has been sent.') break except Exception: @@ -222,13 +223,13 @@ async def build_bg_response(self, task_data, future): """ Wait for a background task to be completed and than uild the task result message. """ - result_message = TaskPayload(**task_data.to_json()) + result_message = TaskPayload(**dataclasses.asdict(task_data)) result = None try: result = await asyncio.wait_for(future, timeout=BACKGROUND_TASK_MAX_EXECUTION_TIME) except Exception as e: logger.warning(f'Got exception during execution of task {task_data.task_id}: {e}') - self.worker.get_extension().logger.exception( + self.worker.get_extension(task_data.task_id).logger.exception( f'Unhandled exception during execution of task {task_data.task_id}', ) result_message.result = ResultType.RETRY @@ -249,12 +250,12 @@ async def build_interactive_response(self, task_data, future): Wait for an interactive task to be completed and than uild the task result message. """ result = None - result_message = TaskPayload(**task_data.to_json()) + result_message = TaskPayload(**dataclasses.asdict(task_data)) try: result = await asyncio.wait_for(future, timeout=INTERACTIVE_TASK_MAX_EXECUTION_TIME) except Exception as e: logger.warning(f'Got exception during execution of task {task_data.task_id}: {e}') - self.worker.get_extension().logger.exception( + self.worker.get_extension(task_data.task_id).logger.exception( f'Unhandled exception during execution of task {task_data.task_id}', ) result_message.result = ResultType.FAIL diff --git a/connect/eaas/worker.py b/connect/eaas/worker.py index 918ebff..531d948 100644 --- a/connect/eaas/worker.py +++ b/connect/eaas/worker.py @@ -4,6 +4,7 @@ # Copyright (c) 2021 Ingram Micro. All Rights Reserved. # import asyncio +import dataclasses import json import logging from asyncio.exceptions import TimeoutError @@ -17,13 +18,21 @@ ) from connect.client import AsyncConnectClient, ConnectClient -from connect.eaas.dataclasses import CapabilitiesPayload, Message, MessageType +from connect.eaas.dataclasses import ( + CapabilitiesPayload, + Message, + MessageType, + parse_message, +) from connect.eaas.helpers import ( get_environment, get_extension_class, get_extension_type, ) -from connect.eaas.logging import ExtensionLogHandler, RequestLogger +from connect.eaas.logging import ( + ExtensionLogHandler, + RequestLogger, +) from connect.eaas.manager import TasksManager @@ -63,6 +72,8 @@ def __init__(self, secure=True): self.paused = False self.logging_handler = None self.environment_type = None + self.account_id = None + self.account_name = None async def ensure_connection(self): """ @@ -93,7 +104,7 @@ async def receive(self): except TimeoutError: pass - def get_client(self): + def get_client(self, task_id): """ Get an instance of the Connect Openapi Client. If the extension is asyncrhonous it returns an instance of the AsyncConnectClient otherwise the ConnectClient. @@ -104,7 +115,10 @@ def get_client(self): endpoint=f'https://{self.api_address}/public/v1', use_specs=False, logger=RequestLogger( - self.get_extension_logger(self.logging_api_key), + logging.LoggerAdapter( + self.get_extension_logger(self.logging_api_key), + {'task_id': task_id}, + ), ), ) @@ -122,6 +136,8 @@ def get_extension_logger(self, token): 'environment_id': self.environment_id, 'instance_id': self.instance_id, 'environment_type': self.environment_type, + 'account_id': self.account_id, + 'account_name': self.account_name, 'api_address': self.api_address, }, ) @@ -151,10 +167,13 @@ def get_url(self): url = f'{self.base_ws_url}/{self.environment_id}/{self.instance_id}' return f'{url}?running_tasks={running_tasks}' - def get_extension(self): + def get_extension(self, task_id): return self.extension_class( - self.get_client(), - self.get_extension_logger(self.logging_api_key), + self.get_client(task_id), + logging.LoggerAdapter( + self.get_extension_logger(self.logging_api_key), + {'task_id': task_id}, + ), self.extension_config, ) @@ -176,7 +195,7 @@ async def run(self): # noqa: CCR001 self.changelog_url, ), ) - await self.send(message.to_json()) + await self.send(dataclasses.asdict(message)) while self.run_event.is_set(): await self.ensure_connection() self.ensure_tasks_manager_running() @@ -208,7 +227,7 @@ async def process_message(self, data): """ Process a message received from the websocket server. """ - message = Message(**data) + message = parse_message(data) if message.message_type == MessageType.CONFIGURATION: await self.configuration(message.data) elif message.message_type == MessageType.TASK: @@ -232,6 +251,11 @@ async def configuration(self, data): self.logging_api_key = data.logging_api_key if data.environment_type: self.environment_type = data.environment_type + if data.account_id: + self.account_id = data.account_id + if data.account_name: + self.account_name = data.account_name + if data.log_level: logger.info(f'Change extesion logger level to {data.log_level}') logging.getLogger('eaas.extension').setLevel( diff --git a/tests/test_dataclasses.py b/tests/test_dataclasses.py index 5007963..f43d801 100644 --- a/tests/test_dataclasses.py +++ b/tests/test_dataclasses.py @@ -1,54 +1,93 @@ +import dataclasses + from connect.eaas.dataclasses import ( CapabilitiesPayload, ConfigurationPayload, + from_dict, Message, MessageType, - + parse_message, + TaskPayload, ) -def test_capabilities_payload(): - assert CapabilitiesPayload( - {'cap1': 'val1'}, - 'https://example.com/readme', - 'https://example.com/changelog', - ).to_json() == { - 'capabilities': {'cap1': 'val1'}, - 'readme_url': 'https://example.com/readme', - 'changelog_url': 'https://example.com/changelog', +def test_from_dict(): + data = { + 'capabilities': {'test': 'data'}, + 'readme_url': 'https://read.me', + 'changelog_url': 'https://change.log', + 'extra': 'data', } + capabilities = from_dict(CapabilitiesPayload, data) + assert capabilities.capabilities == data['capabilities'] + assert capabilities.changelog_url == data['changelog_url'] + assert capabilities.readme_url == data['readme_url'] -def test_configuration_payload(): - assert ConfigurationPayload( - {'conf1': 'val1'}, - 'logging-token', - 'environ-type', - 'log-level', - 'runner-log-level', - ).to_json() == { - 'configuration': {'conf1': 'val1'}, - 'logging_api_key': 'logging-token', - 'environment_type': 'environ-type', - 'log_level': 'log-level', - 'runner_log_level': 'runner-log-level', + +def test_parse_task_message(): + msg_data = { + 'message_type': 'task', + 'data': { + 'task_id': 'task_id', + 'task_category': 'task_category', + 'task_type': 'task_type', + 'object_id': 'object_id', + 'result': 'result', + 'data': {'data': 'value'}, + 'countdown': 10, + 'output': 'output', + 'correlation_id': 'correlation_id', + 'reply_to': 'reply_to', + }, } + message = parse_message(msg_data) + + assert isinstance(message, Message) + assert message.message_type == MessageType.TASK + assert isinstance(message.data, TaskPayload) -def test_message_capabilities(): - cap = CapabilitiesPayload( - {'cap1': 'val1'}, - 'https://example.com/readme', - 'https://example.com/changelog', - ) + assert dataclasses.asdict(message) == msg_data - msg = Message( - MessageType.CAPABILITIES, - cap.to_json(), - ) - assert msg.data == cap - assert msg.to_json() == { - 'message_type': MessageType.CAPABILITIES, - 'data': cap.to_json(), +def test_parse_capabilities_message(): + msg_data = { + 'message_type': 'capabilities', + 'data': { + 'capabilities': {'test': 'data'}, + 'readme_url': 'https://read.me', + 'changelog_url': 'https://change.log', + }, } + + message = parse_message(msg_data) + + assert isinstance(message, Message) + assert message.message_type == MessageType.CAPABILITIES + assert isinstance(message.data, CapabilitiesPayload) + + assert dataclasses.asdict(message) == msg_data + + +def test_parse_configuration_message(): + msg_data = { + 'message_type': 'configuration', + 'data': { + 'configuration': {'conf1': 'val1'}, + 'logging_api_key': 'logging-token', + 'environment_type': 'environ-type', + 'log_level': 'log-level', + 'runner_log_level': 'runner-log-level', + 'account_id': 'account_id', + 'account_name': 'account_name', + }, + } + + message = parse_message(msg_data) + + assert isinstance(message, Message) + assert message.message_type == MessageType.CONFIGURATION + assert isinstance(message.data, ConfigurationPayload) + + assert dataclasses.asdict(message) == msg_data diff --git a/tests/test_manager.py b/tests/test_manager.py index a235cbb..3e70e78 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -1,4 +1,5 @@ import asyncio +import dataclasses import logging import pytest @@ -67,7 +68,7 @@ async def test_background_task_sync(mocker, extension_cls, task_type): await manager.stop() message = Message(message_type=MessageType.TASK, data=task) - json_msg = message.to_json() + json_msg = dataclasses.asdict(message) json_msg['data']['result'] = ResultType.SUCCESS worker.send.assert_awaited_once_with(json_msg) @@ -107,7 +108,7 @@ async def test_background_task_sync_reschedule(mocker, extension_cls, task_type) await manager.stop() message = Message(message_type=MessageType.TASK, data=task) - json_msg = message.to_json() + json_msg = dataclasses.asdict(message) json_msg['data']['result'] = ResultType.RESCHEDULE json_msg['data']['countdown'] = 126 worker.send.assert_awaited_once_with(json_msg) @@ -143,7 +144,7 @@ async def test_background_task_sync_unsupported_status(mocker, extension_cls): await manager.stop() message = Message(message_type=MessageType.TASK, data=task) - json_msg = message.to_json() + json_msg = dataclasses.asdict(message) json_msg['data']['result'] = ResultType.SKIP json_msg['data']['output'] = 'The status approved is not supported by the extension.' worker.send.assert_awaited_once_with(json_msg) @@ -181,7 +182,7 @@ async def test_background_task_async(mocker, extension_cls, task_type): await manager.stop() message = Message(message_type=MessageType.TASK, data=task) - json_msg = message.to_json() + json_msg = dataclasses.asdict(message) json_msg['data']['result'] = ResultType.SUCCESS worker.send.assert_awaited_once_with(json_msg) @@ -220,7 +221,7 @@ async def test_interactive_task_sync(mocker, extension_cls, task_type): await manager.stop() message = Message(message_type=MessageType.TASK, data=task) - json_msg = message.to_json() + json_msg = dataclasses.asdict(message) json_msg['data']['result'] = ResultType.SUCCESS worker.send.assert_awaited_once_with(json_msg) @@ -260,7 +261,7 @@ async def test_interactive_task_async(mocker, extension_cls, task_type): await manager.stop() message = Message(message_type=MessageType.TASK, data=task) - json_msg = message.to_json() + json_msg = dataclasses.asdict(message) json_msg['data']['result'] = ResultType.SUCCESS worker.send.assert_awaited_once_with(json_msg) @@ -293,7 +294,7 @@ async def test_background_task_request_error(mocker, extension_cls): await asyncio.sleep(.1) await manager.stop() message = Message(message_type=MessageType.TASK, data=task) - json_msg = message.to_json() + json_msg = dataclasses.asdict(message) json_msg['data']['result'] = 'retry' json_msg['data']['output'] = 'formatted traceback' worker.send.assert_awaited_once_with(json_msg) @@ -329,7 +330,7 @@ async def test_result_sender_retries(mocker, extension_cls): await asyncio.sleep(.1) await manager.stop() message = Message(message_type=MessageType.TASK, data=task) - json_msg = message.to_json() + json_msg = dataclasses.asdict(message) json_msg['data']['result'] = ResultType.SUCCESS assert worker.send.mock_calls[1].args[0] == json_msg @@ -436,7 +437,7 @@ async def test_interactive_task_exception(mocker, extension_cls): await asyncio.sleep(.1) await manager.stop() message = Message(message_type=MessageType.TASK, data=task) - json_msg = message.to_json() + json_msg = dataclasses.asdict(message) json_msg['data']['result'] = ResultType.FAIL json_msg['data']['output'] = long_stack_trace[:4000] assert worker.send.mock_calls[1].args[0] == json_msg @@ -472,7 +473,7 @@ async def test_interactive_task_exception_product_action(mocker, extension_cls): await asyncio.sleep(.1) await manager.stop() message = Message(message_type=MessageType.TASK, data=task) - json_msg = message.to_json() + json_msg = dataclasses.asdict(message) json_msg['data']['result'] = ResultType.FAIL json_msg['data']['data'] = { 'http_status': 400, diff --git a/tests/test_worker.py b/tests/test_worker.py index cdc6621..3b5cafe 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,7 +1,9 @@ import asyncio +import dataclasses import logging import pytest +from logzio.sender import LogzioSender from websockets.exceptions import ConnectionClosedError, WebSocketException from connect.eaas.dataclasses import ( @@ -50,17 +52,19 @@ def get_descriptor(cls): mocker.patch('connect.eaas.worker.get_extension_class', return_value=MyExtension) mocker.patch('connect.eaas.worker.get_extension_type', return_value='sync') - data_to_send = Message( - MessageType.CONFIGURATION, - ConfigurationPayload( - { - 'var1': 'value1', - 'var2': 'value2', - }, - 'token', - 'development', + data_to_send = dataclasses.asdict( + Message( + MessageType.CONFIGURATION, + ConfigurationPayload( + { + 'var1': 'value1', + 'var2': 'value2', + }, + 'token', + 'development', + ), ), - ).to_json() + ) handler = WSHandler( '/public/v1/devops/ws/ENV-000-0001/INS-000-0002?running_tasks=0', @@ -77,19 +81,21 @@ def get_descriptor(cls): await task handler.assert_received( - Message( - MessageType.CAPABILITIES, - CapabilitiesPayload( - capabilities, - 'https://example.com/README.md', - 'https://example.com/CHANGELOG.md', + dataclasses.asdict( + Message( + MessageType.CAPABILITIES, + CapabilitiesPayload( + capabilities, + 'https://example.com/README.md', + 'https://example.com/CHANGELOG.md', + ), ), - ).to_json(), + ), ) @pytest.mark.asyncio -async def test_pr_task(mocker, ws_server, unused_port, httpx_mock): +async def test_pr_task(mocker, ws_server, unused_port, httpx_mock, caplog): pr_data = {'id': 'PR-000', 'status': 'pending'} @@ -115,6 +121,11 @@ async def test_pr_task(mocker, ws_server, unused_port, httpx_mock): TaskType.ASSET_PURCHASE_REQUEST_VALIDATION: ['draft'], } + mocked_send_logs = mocker.patch.object( + LogzioSender, + 'append', + ) + class MyExtension(Extension): @classmethod def get_descriptor(cls): @@ -125,6 +136,7 @@ def get_descriptor(cls): } def process_asset_purchase_request(self, request): + self.logger.info('test log message') assert request == pr_data return ProcessingResponse.done() @@ -132,15 +144,15 @@ def process_asset_purchase_request(self, request): mocker.patch('connect.eaas.worker.get_extension_type', return_value='sync') data_to_send = [ - Message(MessageType.CONFIGURATION, ConfigurationPayload( - {'var': 'val'}, 'api_key', 'development', - )).to_json(), - Message(MessageType.TASK, TaskPayload( + dataclasses.asdict(Message(MessageType.CONFIGURATION, ConfigurationPayload( + {'var': 'val'}, 'api_key', 'development', 'account_id', 'account_name', + ))), + dataclasses.asdict(Message(MessageType.TASK, TaskPayload( 'TQ-000', TaskCategory.BACKGROUND, TaskType.ASSET_PURCHASE_REQUEST_PROCESSING, 'PR-000', - )).to_json(), + ))), ] handler = WSHandler( @@ -148,7 +160,6 @@ def process_asset_purchase_request(self, request): data_to_send, ['receive', 'send', 'send', 'receive'], ) - async with ws_server(handler): worker = Worker(secure=False) task = asyncio.create_task(worker.run()) @@ -158,25 +169,38 @@ def process_asset_purchase_request(self, request): await task handler.assert_received( - Message( - MessageType.CAPABILITIES, - CapabilitiesPayload( - capabilities, - 'https://example.com/README.md', - 'https://example.com/CHANGELOG.md', + dataclasses.asdict( + Message( + MessageType.CAPABILITIES, + CapabilitiesPayload( + capabilities, + 'https://example.com/README.md', + 'https://example.com/CHANGELOG.md', + ), ), - ).to_json(), + ), ) handler.assert_received( - Message(MessageType.TASK, TaskPayload( - 'TQ-000', - TaskCategory.BACKGROUND, - TaskType.ASSET_PURCHASE_REQUEST_PROCESSING, - 'PR-000', - result=ResultType.SUCCESS, - )).to_json(), + dataclasses.asdict( + Message(MessageType.TASK, TaskPayload( + 'TQ-000', + TaskCategory.BACKGROUND, + TaskType.ASSET_PURCHASE_REQUEST_PROCESSING, + 'PR-000', + result=ResultType.SUCCESS, + )), + ), ) + log_msg = mocked_send_logs.mock_calls[0].args[0] + assert log_msg['message'] == 'test log message' + assert log_msg['account_id'] == 'account_id' + assert log_msg['account_name'] == 'account_name' + assert log_msg['environment_id'] == 'ENV-000-0001' + assert log_msg['environment_type'] == 'development' + assert log_msg['instance_id'] == 'INS-000-0002' + assert log_msg['task_id'] == 'TQ-000' + @pytest.mark.asyncio async def test_tcr_task(mocker, ws_server, unused_port, httpx_mock): @@ -222,15 +246,17 @@ def process_tier_config_setup_request(self, request): mocker.patch('connect.eaas.worker.get_extension_type', return_value='sync') data_to_send = [ - Message(MessageType.CONFIGURATION, ConfigurationPayload( + dataclasses.asdict(Message(MessageType.CONFIGURATION, ConfigurationPayload( {'var': 'val'}, 'api_key', 'development', - )).to_json(), - Message(MessageType.TASK, TaskPayload( - 'TQ-000', - TaskCategory.BACKGROUND, - TaskType.TIER_CONFIG_SETUP_REQUEST_PROCESSING, - 'TCR-000', - )).to_json(), + ))), + dataclasses.asdict( + Message(MessageType.TASK, TaskPayload( + 'TQ-000', + TaskCategory.BACKGROUND, + TaskType.TIER_CONFIG_SETUP_REQUEST_PROCESSING, + 'TCR-000', + )), + ), ] handler = WSHandler( @@ -248,23 +274,27 @@ def process_tier_config_setup_request(self, request): await task handler.assert_received( - Message( - MessageType.CAPABILITIES, - CapabilitiesPayload( - capabilities, - 'https://example.com/README.md', - 'https://example.com/CHANGELOG.md', + dataclasses.asdict( + Message( + MessageType.CAPABILITIES, + CapabilitiesPayload( + capabilities, + 'https://example.com/README.md', + 'https://example.com/CHANGELOG.md', + ), ), - ).to_json(), + ), ) handler.assert_received( - Message(MessageType.TASK, TaskPayload( - 'TQ-000', - TaskCategory.BACKGROUND, - TaskType.TIER_CONFIG_SETUP_REQUEST_PROCESSING, - 'TCR-000', - result=ResultType.SUCCESS, - )).to_json(), + dataclasses.asdict( + Message(MessageType.TASK, TaskPayload( + 'TQ-000', + TaskCategory.BACKGROUND, + TaskType.TIER_CONFIG_SETUP_REQUEST_PROCESSING, + 'TCR-000', + result=ResultType.SUCCESS, + )), + ), ) @@ -300,10 +330,10 @@ def get_descriptor(cls): mocker.patch('connect.eaas.worker.get_extension_type', return_value='sync') data_to_send = [ - Message(MessageType.CONFIGURATION, ConfigurationPayload( + dataclasses.asdict(Message(MessageType.CONFIGURATION, ConfigurationPayload( {'var': 'val'}, 'api_key', 'development', - )).to_json(), - Message(MessageType.PAUSE).to_json(), + ))), + dataclasses.asdict(Message(MessageType.PAUSE)), ] handler = WSHandler( @@ -354,11 +384,11 @@ def get_descriptor(cls): mocker.patch('connect.eaas.worker.get_extension_type', return_value='sync') data_to_send = [ - Message(MessageType.CONFIGURATION, ConfigurationPayload( + dataclasses.asdict(Message(MessageType.CONFIGURATION, ConfigurationPayload( {'var': 'val'}, 'api_key', 'development', - )).to_json(), - Message(MessageType.PAUSE).to_json(), - Message(MessageType.RESUME).to_json(), + ))), + dataclasses.asdict(Message(MessageType.PAUSE)), + dataclasses.asdict(Message(MessageType.RESUME)), ] handler = WSHandler( @@ -409,10 +439,10 @@ def get_descriptor(cls): mocker.patch('connect.eaas.worker.get_extension_type', return_value='sync') data_to_send = [ - Message(MessageType.CONFIGURATION, ConfigurationPayload( + dataclasses.asdict(Message(MessageType.CONFIGURATION, ConfigurationPayload( {'var': 'val'}, 'api_key', 'development', - )).to_json(), - Message(MessageType.SHUTDOWN).to_json(), + ))), + dataclasses.asdict(Message(MessageType.SHUTDOWN)), ] handler = WSHandler(