Skip to content

Commit

Permalink
feat: improve logging and message retransmission behavior (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptk committed May 30, 2023
1 parent 9af4e36 commit 2712c6d
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 23 deletions.
4 changes: 2 additions & 2 deletions pyomnilogic_local/api.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ async def async_send_message(self, message_type: MessageType, message: str | Non
resp: str | None = None
try:
if need_response:
resp = await asyncio.wait_for(protocol.send_and_receive(message_type, message), self.response_timeout)
resp = await protocol.send_and_receive(message_type, message)
else:
await asyncio.wait_for(protocol.send_message(message_type, message), self.response_timeout)
await protocol.send_message(message_type, message)
finally:
transport.close()

Expand Down
9 changes: 7 additions & 2 deletions pyomnilogic_local/models/mspconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import sys
from typing import Any, Literal, TypeAlias

from ..types import OmniParsingException

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self


from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, ValidationError
from xmltodict import parse as xml_parse

from ..types import (
Expand Down Expand Up @@ -209,4 +211,7 @@ def load_xml(xml: str) -> MSPConfig:
OmniType.SCHE,
),
)
return MSPConfig.parse_obj(data["MSPConfig"])
try:
return MSPConfig.parse_obj(data["MSPConfig"])
except ValidationError as exc:
raise OmniParsingException(f"Failed to parse MSP Configuration: {exc}") from exc
8 changes: 6 additions & 2 deletions pyomnilogic_local/models/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing import Any, SupportsInt, TypeAlias, TypeVar, cast, overload

from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, ValidationError
from xmltodict import parse as xml_parse

from ..types import (
Expand All @@ -17,6 +17,7 @@
FilterWhyOn,
HeaterMode,
HeaterState,
OmniParsingException,
OmniType,
PumpState,
RelayState,
Expand Down Expand Up @@ -226,7 +227,10 @@ def xml_postprocessor(path: Any, key: Any, value: SupportsInt | Any) -> tuple[An
OmniType.VALVE_ACTUATOR,
),
)
return Telemetry.parse_obj(data["STATUS"])
try:
return Telemetry.parse_obj(data["STATUS"])
except ValidationError as exc:
raise OmniParsingException(f"Failed to parse Telemetry: {exc}") from exc

def get_telem_by_systemid(self, system_id: int) -> TelemetryType | None:
for field_name, value in self:
Expand Down
52 changes: 35 additions & 17 deletions pyomnilogic_local/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing_extensions import Self

from .models.leadmessage import LeadMessage
from .types import ClientType, MessageType
from .types import ClientType, MessageType, OmniTimeoutException

_LOGGER = logging.getLogger(__name__)

Expand All @@ -23,9 +23,9 @@ class OmniLogicMessage:
client_type: ClientType = ClientType.SIMPLE
version: str = "1.19"
timestamp: int | None
reserved_1: int
compressed: int
reserved_2: int
reserved_1: int = 0
compressed: int = 0
reserved_2: int = 0

def __init__(self, msg_id: int, msg_type: MessageType, payload: str | None = None, version: str = "1.19") -> None:
self.id = msg_id
Expand All @@ -52,6 +52,11 @@ def __bytes__(self) -> bytes:
)
return header + self.payload

def __repr__(self) -> str:
if self.compressed or self.type is MessageType.MSP_BLOCKMESSAGE:
return f"ID: {self.id}, Type: {self.type}, Compressed: {self.compressed}"
return f"ID: {self.id}, Type: {self.type}, Compressed: {self.compressed}, Body: {self.payload[:-1].decode('utf-8')}"

@classmethod
def from_bytes(cls, data: bytes) -> Self:
# split the header and data
Expand All @@ -73,6 +78,10 @@ def from_bytes(cls, data: bytes) -> Self:

class OmniLogicProtocol(asyncio.DatagramProtocol):
transport: asyncio.DatagramTransport
# The omni will re-transmit a packet every 2 seconds if it does not receive an ACK. We pad that just a touch to be safe
_omni_retransmit_time = 2.1
# The omni will re-transmit 5 times (a total of 6 attempts including the initial) if it does not receive an ACK
_omni_retransmit_count = 5

def __init__(self) -> None:
self.data_queue = asyncio.Queue[OmniLogicMessage]()
Expand All @@ -86,10 +95,7 @@ def connection_lost(self, exc: Exception | None) -> None:

def datagram_received(self, data: bytes, addr: tuple[str | Any, int]) -> None:
message = OmniLogicMessage.from_bytes(data)
if message.compressed:
_LOGGER.debug("Received compressed message ID: %s, Type: %s", message.id, message.type)
else:
_LOGGER.debug("Received Message ID: %s, Type: %s", message.id, message.type)
_LOGGER.debug("Received Message %s", str(message))
self.data_queue.put_nowait(message)

def error_received(self, exc: Exception) -> None:
Expand All @@ -115,9 +121,8 @@ async def _wait_for_ack(self, ack_id: int) -> None:
# eventually time out waiting for it, that way we can deal with the dropped packets
message = await self.data_queue.get()

async def _ensure_sent(self, message: OmniLogicMessage) -> None:
delivered = False
while not delivered:
async def _ensure_sent(self, message: OmniLogicMessage, max_attempts: int = 5) -> None:
for attempt in range(0, max_attempts):
self.transport.sendto(bytes(message))

# If the message that we just sent is an ACK, we do not need to wait to receive an ACK, we are done
Expand All @@ -127,9 +132,12 @@ async def _ensure_sent(self, message: OmniLogicMessage) -> None:
# Wait for a bit to either receive an ACK for our message, otherwise, we retry delivery
try:
await asyncio.wait_for(self._wait_for_ack(message.id), 0.25)
delivered = True
except TimeoutError:
_LOGGER.debug("ACK not received, re-attempting delivery")
return
except TimeoutError as exc:
if attempt < 4:
_LOGGER.debug("ACK not received, re-attempting delivery")
else:
raise OmniTimeoutException("Failed to receive acknowledgement of command, max retries exceeded") from exc

async def send_and_receive(self, msg_type: MessageType, payload: str | None, msg_id: int | None = None) -> str:
await self.send_message(msg_type, payload, msg_id)
Expand All @@ -141,10 +149,10 @@ async def send_message(self, msg_type: MessageType, payload: str | None, msg_id:
if not msg_id:
msg_id = random.randrange(2**32)

_LOGGER.debug("Sending Message ID: %s, Message Type: %s, Request Body: %s", msg_id, msg_type.name, payload)

message = OmniLogicMessage(msg_id, msg_type, payload)

_LOGGER.debug("Sending Message %s", str(message))

await self._ensure_sent(message)

async def _send_ack(self, msg_id: int) -> None:
Expand All @@ -170,17 +178,27 @@ async def _receive_file(self) -> str:
if message.type == MessageType.MSP_LEADMESSAGE:
leadmsg = LeadMessage.from_orm(ET.fromstring(message.payload[:-1]))

_LOGGER.debug("Will receive %s blockmessages", leadmsg.msg_block_count)

# Wait for the block data data
retval: bytes = b""
# If we received a LeadMessage, continue to receive messages until we have all of our data
# Fragments of data may arrive out of order, so we store them in a buffer as they arrive and sort them after
data_fragments: dict[int, bytes] = {}
while len(data_fragments) < leadmsg.msg_block_count:
resp = await self.data_queue.get()
# We need to wait long enough for the Omni to get through all of it's retries before we bail out.
try:
resp = await asyncio.wait_for(self.data_queue.get(), self._omni_retransmit_time * self._omni_retransmit_count)
except TimeoutError as exc:
raise OmniTimeoutException from exc

# We only want to collect blockmessages here
if resp.type is not MessageType.MSP_BLOCKMESSAGE:
_LOGGER.debug("Received a message other than a blockmessage: %s", resp.type)
continue

await self._send_ack(resp.id)

# remove an 8 byte header to get to the payload data
data_fragments[resp.id] = resp.payload[8:]

Expand Down
12 changes: 12 additions & 0 deletions pyomnilogic_local/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,15 @@ class SensorUnits(str, PrettyEnum):
class ValveActuatorState(PrettyEnum):
OFF = 0
ON = 1


class OmniLogicException(Exception):
pass


class OmniTimeoutException(OmniLogicException):
pass


class OmniParsingException(OmniLogicException):
pass

0 comments on commit 2712c6d

Please sign in to comment.