Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/s2python/s2_asset_details.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import logging
import uuid
from dataclasses import dataclass
from typing import Optional, List

from s2python.common import (
Role,
ResourceManagerDetails,
Duration,
Currency,
)
from s2python.generated.gen_s2 import CommodityQuantity
from s2python.s2_control_type import S2ControlType

logger = logging.getLogger("s2python")


@dataclass
class AssetDetails: # pylint: disable=too-many-instance-attributes
resource_id: uuid.UUID

provides_forecast: bool
provides_power_measurements: List[CommodityQuantity]

instruction_processing_delay: Duration
roles: List[Role]
currency: Optional[Currency] = None

name: Optional[str] = None
manufacturer: Optional[str] = None
model: Optional[str] = None
firmware_version: Optional[str] = None
serial_number: Optional[str] = None

def to_resource_manager_details(
self, control_types: List[S2ControlType]
) -> ResourceManagerDetails:
return ResourceManagerDetails(
available_control_types=[
control_type.get_protocol_control_type()
for control_type in control_types
],
currency=self.currency,
firmware_version=self.firmware_version,
instruction_processing_delay=self.instruction_processing_delay,
manufacturer=self.manufacturer,
message_id=uuid.uuid4(),
model=self.model,
name=self.name,
provides_forecast=self.provides_forecast,
provides_power_measurement_types=self.provides_power_measurements,
resource_id=self.resource_id,
roles=self.roles,
serial_number=self.serial_number,
)
167 changes: 11 additions & 156 deletions src/s2python/s2_connection.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
__all__ = [
"AssetDetails",
"S2MessageHandler",
"SendOkay",
"MessageHandlers",
"S2Connection"
] # re-export for backward compatibility

try:
import websockets
except ImportError as exc:
Expand All @@ -12,8 +20,7 @@
import threading
import uuid
import ssl
from dataclasses import dataclass
from typing import Any, Optional, List, Type, Dict, Callable, Awaitable, Union
from typing import Any, Optional, List, Dict, Awaitable

from websockets.asyncio.client import (
ClientConnection as WSConnection,
Expand All @@ -25,173 +32,21 @@
ReceptionStatus,
Handshake,
EnergyManagementRole,
Role,
HandshakeResponse,
ResourceManagerDetails,
Duration,
Currency,
SelectControlType,
)
from s2python.generated.gen_s2 import CommodityQuantity
from s2python.reception_status_awaiter import ReceptionStatusAwaiter
from s2python.s2_control_type import S2ControlType
from s2python.s2_parser import S2Parser
from s2python.s2_validation_error import S2ValidationError
from s2python.s2_asset_details import AssetDetails
from s2python.s2_message_handlers import S2MessageHandler, SendOkay, MessageHandlers
from s2python.message import S2Message
from s2python.version import S2_VERSION

logger = logging.getLogger("s2python")


@dataclass
class AssetDetails: # pylint: disable=too-many-instance-attributes
resource_id: uuid.UUID

provides_forecast: bool
provides_power_measurements: List[CommodityQuantity]

instruction_processing_delay: Duration
roles: List[Role]
currency: Optional[Currency] = None

name: Optional[str] = None
manufacturer: Optional[str] = None
model: Optional[str] = None
firmware_version: Optional[str] = None
serial_number: Optional[str] = None

def to_resource_manager_details(
self, control_types: List[S2ControlType]
) -> ResourceManagerDetails:
return ResourceManagerDetails(
available_control_types=[
control_type.get_protocol_control_type()
for control_type in control_types
],
currency=self.currency,
firmware_version=self.firmware_version,
instruction_processing_delay=self.instruction_processing_delay,
manufacturer=self.manufacturer,
message_id=uuid.uuid4(),
model=self.model,
name=self.name,
provides_forecast=self.provides_forecast,
provides_power_measurement_types=self.provides_power_measurements,
resource_id=self.resource_id,
roles=self.roles,
serial_number=self.serial_number,
)


S2MessageHandler = Union[
Callable[["S2Connection", S2Message, Callable[[], None]], None],
Callable[["S2Connection", S2Message, Awaitable[None]], Awaitable[None]],
]


class SendOkay:
status_is_send: threading.Event
connection: "S2Connection"
subject_message_id: uuid.UUID

def __init__(self, connection: "S2Connection", subject_message_id: uuid.UUID):
self.status_is_send = threading.Event()
self.connection = connection
self.subject_message_id = subject_message_id

async def run_async(self) -> None:
self.status_is_send.set()

await self.connection._respond_with_reception_status( # pylint: disable=protected-access
subject_message_id=self.subject_message_id,
status=ReceptionStatusValues.OK,
diagnostic_label="Processed okay.",
)

def run_sync(self) -> None:
self.status_is_send.set()

self.connection.respond_with_reception_status_sync(
subject_message_id=self.subject_message_id,
status=ReceptionStatusValues.OK,
diagnostic_label="Processed okay.",
)

async def ensure_send_async(self, type_msg: Type[S2Message]) -> None:
if not self.status_is_send.is_set():
logger.warning(
"Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. "
"Sending it now.",
type_msg,
self.subject_message_id,
)
await self.run_async()

def ensure_send_sync(self, type_msg: Type[S2Message]) -> None:
if not self.status_is_send.is_set():
logger.warning(
"Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. "
"Sending it now.",
type_msg,
self.subject_message_id,
)
self.run_sync()


class MessageHandlers:
handlers: Dict[Type[S2Message], S2MessageHandler]

def __init__(self) -> None:
self.handlers = {}

async def handle_message(self, connection: "S2Connection", msg: S2Message) -> None:
"""Handle the S2 message using the registered handler.

:param connection: The S2 conncetion the `msg` is received from.
:param msg: The S2 message
"""
handler = self.handlers.get(type(msg))
if handler is not None:
send_okay = SendOkay(connection, msg.message_id) # type: ignore[attr-defined, union-attr]

try:
if asyncio.iscoroutinefunction(handler):
await handler(connection, msg, send_okay.run_async()) # type: ignore[arg-type]
await send_okay.ensure_send_async(type(msg))
else:

def do_message() -> None:
handler(connection, msg, send_okay.run_sync) # type: ignore[arg-type]
send_okay.ensure_send_sync(type(msg))

eventloop = asyncio.get_event_loop()
await eventloop.run_in_executor(executor=None, func=do_message)
except Exception:
if not send_okay.status_is_send.is_set():
await connection._respond_with_reception_status( # pylint: disable=protected-access
subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr]
status=ReceptionStatusValues.PERMANENT_ERROR,
diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long
f"an unrecoverable error occurred.",
)
raise
else:
logger.warning(
"Received a message of type %s but no handler is registered. Ignoring the message.",
type(msg),
)

def register_handler(
self, msg_type: Type[S2Message], handler: S2MessageHandler
) -> None:
"""Register a coroutine function or a normal function as the handler for a specific S2 message type.

:param msg_type: The S2 message type to attach the handler to.
:param handler: The function (asynchronuous or normal) which should handle the S2 message.
"""
self.handlers[msg_type] = handler


class S2Connection: # pylint: disable=too-many-instance-attributes
url: str
reconnect: bool
Expand Down
122 changes: 122 additions & 0 deletions src/s2python/s2_message_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import asyncio
import logging
import threading
import uuid
from typing import Type, Dict, Callable, Awaitable, Union, TYPE_CHECKING

from s2python.common import ReceptionStatusValues
from s2python.message import S2Message

if TYPE_CHECKING:
from s2python.s2_connection import S2Connection

logger = logging.getLogger("s2python")


S2MessageHandler = Union[
Callable[["S2Connection", S2Message, Callable[[], None]], None],
Callable[["S2Connection", S2Message, Awaitable[None]], Awaitable[None]],
]


class SendOkay:
status_is_send: threading.Event
connection: "S2Connection"
subject_message_id: uuid.UUID

def __init__(self, connection: "S2Connection", subject_message_id: uuid.UUID):
self.status_is_send = threading.Event()
self.connection = connection
self.subject_message_id = subject_message_id

async def run_async(self) -> None:
self.status_is_send.set()

await self.connection._respond_with_reception_status( # pylint: disable=protected-access
subject_message_id=self.subject_message_id,
status=ReceptionStatusValues.OK,
diagnostic_label="Processed okay.",
)

def run_sync(self) -> None:
self.status_is_send.set()

self.connection.respond_with_reception_status_sync(
subject_message_id=self.subject_message_id,
status=ReceptionStatusValues.OK,
diagnostic_label="Processed okay.",
)

async def ensure_send_async(self, type_msg: Type[S2Message]) -> None:
if not self.status_is_send.is_set():
logger.warning(
"Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. "
"Sending it now.",
type_msg,
self.subject_message_id,
)
await self.run_async()

def ensure_send_sync(self, type_msg: Type[S2Message]) -> None:
if not self.status_is_send.is_set():
logger.warning(
"Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. "
"Sending it now.",
type_msg,
self.subject_message_id,
)
self.run_sync()


class MessageHandlers:
handlers: Dict[Type[S2Message], S2MessageHandler]

def __init__(self) -> None:
self.handlers = {}

async def handle_message(self, connection: "S2Connection", msg: S2Message) -> None:
"""Handle the S2 message using the registered handler.

:param connection: The S2 conncetion the `msg` is received from.
:param msg: The S2 message
"""
handler = self.handlers.get(type(msg))
if handler is not None:
send_okay = SendOkay(connection, msg.message_id) # type: ignore[attr-defined, union-attr]

try:
if asyncio.iscoroutinefunction(handler):
await handler(connection, msg, send_okay.run_async()) # type: ignore[arg-type]
await send_okay.ensure_send_async(type(msg))
else:

def do_message() -> None:
handler(connection, msg, send_okay.run_sync) # type: ignore[arg-type]
send_okay.ensure_send_sync(type(msg))

eventloop = asyncio.get_event_loop()
await eventloop.run_in_executor(executor=None, func=do_message)
except Exception:
if not send_okay.status_is_send.is_set():
await connection._respond_with_reception_status( # pylint: disable=protected-access
subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr]
status=ReceptionStatusValues.PERMANENT_ERROR,
diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long
f"an unrecoverable error occurred.",
)
raise
else:
logger.warning(
"Received a message of type %s but no handler is registered. Ignoring the message.",
type(msg),
)

def register_handler(
self, msg_type: Type[S2Message], handler: S2MessageHandler
) -> None:
"""Register a coroutine function or a normal function as the handler for a specific S2 message type.

:param msg_type: The S2 message type to attach the handler to.
:param handler: The function (asynchronuous or normal) which should handle the S2 message.
"""
self.handlers[msg_type] = handler