From 6c08f17ba43684ed3fe6acb7b2460f542edbfd20 Mon Sep 17 00:00:00 2001 From: Philipp Trenz Date: Wed, 26 Nov 2025 13:59:47 +0100 Subject: [PATCH] Move AssetDetails and MessageHandlers into separate file --- src/s2python/s2_asset_details.py | 55 +++++++++ src/s2python/s2_connection.py | 167 ++-------------------------- src/s2python/s2_message_handlers.py | 122 ++++++++++++++++++++ 3 files changed, 188 insertions(+), 156 deletions(-) create mode 100644 src/s2python/s2_asset_details.py create mode 100644 src/s2python/s2_message_handlers.py diff --git a/src/s2python/s2_asset_details.py b/src/s2python/s2_asset_details.py new file mode 100644 index 0000000..cf05539 --- /dev/null +++ b/src/s2python/s2_asset_details.py @@ -0,0 +1,55 @@ +import logging +import uuid +from dataclasses import dataclass +from typing import Optional, List + +from s2python.common import ( + Role, + ResourceManagerDetails, + Duration, + Currency, +) +from s2python.generated.gen_s2 import CommodityQuantity +from s2python.s2_control_type import S2ControlType + +logger = logging.getLogger("s2python") + + +@dataclass +class AssetDetails: # pylint: disable=too-many-instance-attributes + resource_id: uuid.UUID + + provides_forecast: bool + provides_power_measurements: List[CommodityQuantity] + + instruction_processing_delay: Duration + roles: List[Role] + currency: Optional[Currency] = None + + name: Optional[str] = None + manufacturer: Optional[str] = None + model: Optional[str] = None + firmware_version: Optional[str] = None + serial_number: Optional[str] = None + + def to_resource_manager_details( + self, control_types: List[S2ControlType] + ) -> ResourceManagerDetails: + return ResourceManagerDetails( + available_control_types=[ + control_type.get_protocol_control_type() + for control_type in control_types + ], + currency=self.currency, + firmware_version=self.firmware_version, + instruction_processing_delay=self.instruction_processing_delay, + manufacturer=self.manufacturer, + message_id=uuid.uuid4(), + model=self.model, + name=self.name, + provides_forecast=self.provides_forecast, + provides_power_measurement_types=self.provides_power_measurements, + resource_id=self.resource_id, + roles=self.roles, + serial_number=self.serial_number, + ) diff --git a/src/s2python/s2_connection.py b/src/s2python/s2_connection.py index b5d1ab0..1c1e7b6 100644 --- a/src/s2python/s2_connection.py +++ b/src/s2python/s2_connection.py @@ -1,3 +1,11 @@ +__all__ = [ + "AssetDetails", + "S2MessageHandler", + "SendOkay", + "MessageHandlers", + "S2Connection" +] # re-export for backward compatibility + try: import websockets except ImportError as exc: @@ -12,8 +20,7 @@ import threading import uuid import ssl -from dataclasses import dataclass -from typing import Any, Optional, List, Type, Dict, Callable, Awaitable, Union +from typing import Any, Optional, List, Dict, Awaitable from websockets.asyncio.client import ( ClientConnection as WSConnection, @@ -25,173 +32,21 @@ ReceptionStatus, Handshake, EnergyManagementRole, - Role, HandshakeResponse, - ResourceManagerDetails, - Duration, - Currency, SelectControlType, ) -from s2python.generated.gen_s2 import CommodityQuantity from s2python.reception_status_awaiter import ReceptionStatusAwaiter from s2python.s2_control_type import S2ControlType from s2python.s2_parser import S2Parser from s2python.s2_validation_error import S2ValidationError +from s2python.s2_asset_details import AssetDetails +from s2python.s2_message_handlers import S2MessageHandler, SendOkay, MessageHandlers from s2python.message import S2Message from s2python.version import S2_VERSION logger = logging.getLogger("s2python") -@dataclass -class AssetDetails: # pylint: disable=too-many-instance-attributes - resource_id: uuid.UUID - - provides_forecast: bool - provides_power_measurements: List[CommodityQuantity] - - instruction_processing_delay: Duration - roles: List[Role] - currency: Optional[Currency] = None - - name: Optional[str] = None - manufacturer: Optional[str] = None - model: Optional[str] = None - firmware_version: Optional[str] = None - serial_number: Optional[str] = None - - def to_resource_manager_details( - self, control_types: List[S2ControlType] - ) -> ResourceManagerDetails: - return ResourceManagerDetails( - available_control_types=[ - control_type.get_protocol_control_type() - for control_type in control_types - ], - currency=self.currency, - firmware_version=self.firmware_version, - instruction_processing_delay=self.instruction_processing_delay, - manufacturer=self.manufacturer, - message_id=uuid.uuid4(), - model=self.model, - name=self.name, - provides_forecast=self.provides_forecast, - provides_power_measurement_types=self.provides_power_measurements, - resource_id=self.resource_id, - roles=self.roles, - serial_number=self.serial_number, - ) - - -S2MessageHandler = Union[ - Callable[["S2Connection", S2Message, Callable[[], None]], None], - Callable[["S2Connection", S2Message, Awaitable[None]], Awaitable[None]], -] - - -class SendOkay: - status_is_send: threading.Event - connection: "S2Connection" - subject_message_id: uuid.UUID - - def __init__(self, connection: "S2Connection", subject_message_id: uuid.UUID): - self.status_is_send = threading.Event() - self.connection = connection - self.subject_message_id = subject_message_id - - async def run_async(self) -> None: - self.status_is_send.set() - - await self.connection._respond_with_reception_status( # pylint: disable=protected-access - subject_message_id=self.subject_message_id, - status=ReceptionStatusValues.OK, - diagnostic_label="Processed okay.", - ) - - def run_sync(self) -> None: - self.status_is_send.set() - - self.connection.respond_with_reception_status_sync( - subject_message_id=self.subject_message_id, - status=ReceptionStatusValues.OK, - diagnostic_label="Processed okay.", - ) - - async def ensure_send_async(self, type_msg: Type[S2Message]) -> None: - if not self.status_is_send.is_set(): - logger.warning( - "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " - "Sending it now.", - type_msg, - self.subject_message_id, - ) - await self.run_async() - - def ensure_send_sync(self, type_msg: Type[S2Message]) -> None: - if not self.status_is_send.is_set(): - logger.warning( - "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " - "Sending it now.", - type_msg, - self.subject_message_id, - ) - self.run_sync() - - -class MessageHandlers: - handlers: Dict[Type[S2Message], S2MessageHandler] - - def __init__(self) -> None: - self.handlers = {} - - async def handle_message(self, connection: "S2Connection", msg: S2Message) -> None: - """Handle the S2 message using the registered handler. - - :param connection: The S2 conncetion the `msg` is received from. - :param msg: The S2 message - """ - handler = self.handlers.get(type(msg)) - if handler is not None: - send_okay = SendOkay(connection, msg.message_id) # type: ignore[attr-defined, union-attr] - - try: - if asyncio.iscoroutinefunction(handler): - await handler(connection, msg, send_okay.run_async()) # type: ignore[arg-type] - await send_okay.ensure_send_async(type(msg)) - else: - - def do_message() -> None: - handler(connection, msg, send_okay.run_sync) # type: ignore[arg-type] - send_okay.ensure_send_sync(type(msg)) - - eventloop = asyncio.get_event_loop() - await eventloop.run_in_executor(executor=None, func=do_message) - except Exception: - if not send_okay.status_is_send.is_set(): - await connection._respond_with_reception_status( # pylint: disable=protected-access - subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr] - status=ReceptionStatusValues.PERMANENT_ERROR, - diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long - f"an unrecoverable error occurred.", - ) - raise - else: - logger.warning( - "Received a message of type %s but no handler is registered. Ignoring the message.", - type(msg), - ) - - def register_handler( - self, msg_type: Type[S2Message], handler: S2MessageHandler - ) -> None: - """Register a coroutine function or a normal function as the handler for a specific S2 message type. - - :param msg_type: The S2 message type to attach the handler to. - :param handler: The function (asynchronuous or normal) which should handle the S2 message. - """ - self.handlers[msg_type] = handler - - class S2Connection: # pylint: disable=too-many-instance-attributes url: str reconnect: bool diff --git a/src/s2python/s2_message_handlers.py b/src/s2python/s2_message_handlers.py new file mode 100644 index 0000000..4b62544 --- /dev/null +++ b/src/s2python/s2_message_handlers.py @@ -0,0 +1,122 @@ +import asyncio +import logging +import threading +import uuid +from typing import Type, Dict, Callable, Awaitable, Union, TYPE_CHECKING + +from s2python.common import ReceptionStatusValues +from s2python.message import S2Message + +if TYPE_CHECKING: + from s2python.s2_connection import S2Connection + +logger = logging.getLogger("s2python") + + +S2MessageHandler = Union[ + Callable[["S2Connection", S2Message, Callable[[], None]], None], + Callable[["S2Connection", S2Message, Awaitable[None]], Awaitable[None]], +] + + +class SendOkay: + status_is_send: threading.Event + connection: "S2Connection" + subject_message_id: uuid.UUID + + def __init__(self, connection: "S2Connection", subject_message_id: uuid.UUID): + self.status_is_send = threading.Event() + self.connection = connection + self.subject_message_id = subject_message_id + + async def run_async(self) -> None: + self.status_is_send.set() + + await self.connection._respond_with_reception_status( # pylint: disable=protected-access + subject_message_id=self.subject_message_id, + status=ReceptionStatusValues.OK, + diagnostic_label="Processed okay.", + ) + + def run_sync(self) -> None: + self.status_is_send.set() + + self.connection.respond_with_reception_status_sync( + subject_message_id=self.subject_message_id, + status=ReceptionStatusValues.OK, + diagnostic_label="Processed okay.", + ) + + async def ensure_send_async(self, type_msg: Type[S2Message]) -> None: + if not self.status_is_send.is_set(): + logger.warning( + "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " + "Sending it now.", + type_msg, + self.subject_message_id, + ) + await self.run_async() + + def ensure_send_sync(self, type_msg: Type[S2Message]) -> None: + if not self.status_is_send.is_set(): + logger.warning( + "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " + "Sending it now.", + type_msg, + self.subject_message_id, + ) + self.run_sync() + + +class MessageHandlers: + handlers: Dict[Type[S2Message], S2MessageHandler] + + def __init__(self) -> None: + self.handlers = {} + + async def handle_message(self, connection: "S2Connection", msg: S2Message) -> None: + """Handle the S2 message using the registered handler. + + :param connection: The S2 conncetion the `msg` is received from. + :param msg: The S2 message + """ + handler = self.handlers.get(type(msg)) + if handler is not None: + send_okay = SendOkay(connection, msg.message_id) # type: ignore[attr-defined, union-attr] + + try: + if asyncio.iscoroutinefunction(handler): + await handler(connection, msg, send_okay.run_async()) # type: ignore[arg-type] + await send_okay.ensure_send_async(type(msg)) + else: + + def do_message() -> None: + handler(connection, msg, send_okay.run_sync) # type: ignore[arg-type] + send_okay.ensure_send_sync(type(msg)) + + eventloop = asyncio.get_event_loop() + await eventloop.run_in_executor(executor=None, func=do_message) + except Exception: + if not send_okay.status_is_send.is_set(): + await connection._respond_with_reception_status( # pylint: disable=protected-access + subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr] + status=ReceptionStatusValues.PERMANENT_ERROR, + diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long + f"an unrecoverable error occurred.", + ) + raise + else: + logger.warning( + "Received a message of type %s but no handler is registered. Ignoring the message.", + type(msg), + ) + + def register_handler( + self, msg_type: Type[S2Message], handler: S2MessageHandler + ) -> None: + """Register a coroutine function or a normal function as the handler for a specific S2 message type. + + :param msg_type: The S2 message type to attach the handler to. + :param handler: The function (asynchronuous or normal) which should handle the S2 message. + """ + self.handlers[msg_type] = handler