Skip to content

Commit

Permalink
change RemoteValue to ABC and typecheck it
Browse files Browse the repository at this point in the history
  • Loading branch information
farmio committed Dec 17, 2020
1 parent 30720c7 commit 47487e2
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 93 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ warn_redundant_casts = true
warn_unused_configs = true

# add the modules below once we add typing for them so that we fail the build in the future if someone changes something without updating the typings
[mypy-xknx.core.*,xknx.devices.travelcalculator,xknx.exceptions.*,xknx.io.gateway_scanner,xknx.telegram.*,]
[mypy-xknx.core.*,xknx.devices.travelcalculator,xknx.exceptions.*,xknx.io.gateway_scanner,xknx.remote_value.remote_value,xknx.telegram.*,]
strict = true
ignore_errors = false
warn_unreachable = true
Expand Down
2 changes: 2 additions & 0 deletions test/core_tests/state_updater_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from xknx.telegram import GroupAddress


@patch.multiple(RemoteValue, __abstractmethods__=set())
class TestStateUpdater(unittest.TestCase):
"""Test class for state updater."""

Expand All @@ -19,6 +20,7 @@ def setUp(self):
"""Set up test class."""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
patch.multiple(RemoteValue, __abstractmethods__=set())

def tearDown(self):
"""Tear down test class."""
Expand Down
21 changes: 1 addition & 20 deletions test/remote_value_tests/remote_value_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from xknx.telegram.apci import GroupValueWrite


@patch.multiple(RemoteValue, __abstractmethods__=set())
class TestRemoteValue(unittest.TestCase):
"""Test class for RemoteValue objects."""

Expand All @@ -33,26 +34,6 @@ def test_warn_payload_valid(self):
"'payload_valid()' not implemented for %s", "RemoteValue"
)

def test_warn_to_knx(self):
"""Test for warning if to_knx is not implemented."""
xknx = XKNX()
remote_value = RemoteValue(xknx)
with patch("logging.Logger.warning") as mock_warn:
remote_value.to_knx(23)
mock_warn.assert_called_with(
"'to_knx()' not implemented for %s", "RemoteValue"
)

def test_warn_from_knx(self):
"""Test for warning if from_knx is not implemented."""
xknx = XKNX()
remote_value = RemoteValue(xknx)
with patch("logging.Logger.warning") as mock_warn:
remote_value.from_knx(DPTBinary(0))
mock_warn.assert_called_with(
"'from_knx()' not implemented for %s", "RemoteValue"
)

def test_info_set_uninitialized(self):
"""Test for info if RemoteValue is not initialized."""
xknx = XKNX()
Expand Down
3 changes: 3 additions & 0 deletions test/str_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Unit test for String representations."""
import asyncio
import unittest
from unittest.mock import patch

from xknx import XKNX
from xknx.devices import (
Expand Down Expand Up @@ -72,9 +73,11 @@ def tearDown(self):
"""Tear down test class."""
self.loop.close()

@patch.multiple(RemoteValue, __abstractmethods__=set())
def test_remote_value(self):
"""Test string representation of remote value."""
xknx = XKNX()
# pylint: disable=abstract-class-instantiated
remote_value = RemoteValue(
xknx,
group_address="1/2/3",
Expand Down
6 changes: 3 additions & 3 deletions xknx/exceptions/exception.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Module for XKXN Exceptions."""
from typing import Any, Dict, Optional, Tuple, Union
from typing import Any, Optional, Tuple, Union


class XKNXException(Exception):
Expand Down Expand Up @@ -31,7 +31,7 @@ def __init__(self, message: str, should_log: bool = True) -> None:
class CouldNotParseTelegram(XKNXException):
"""Could not parse telegram error."""

def __init__(self, description: str, **kwargs: Dict[str, str]) -> None:
def __init__(self, description: str, **kwargs: str) -> None:
"""Initialize CouldNotParseTelegram class."""
super().__init__()
self.description = description
Expand Down Expand Up @@ -78,7 +78,7 @@ def __str__(self) -> str:
class ConversionError(XKNXException):
"""Exception class for error while converting one type to another."""

def __init__(self, description: str, **kwargs: Dict[str, str]) -> None:
def __init__(self, description: str, **kwargs: str) -> None:
"""Initialize ConversionError class."""
super().__init__()
self.description = description
Expand Down
137 changes: 82 additions & 55 deletions xknx/remote_value/remote_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,63 +6,83 @@
- a group address for reading a KNX value,
- or a group of both representing the same value.
"""
from abc import ABC, abstractmethod
import logging
from typing import List
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Generator,
List,
Optional,
Union,
)

from xknx.exceptions import CouldNotParseTelegram
from xknx.telegram import GroupAddress, Telegram
from xknx.telegram.apci import GroupValueResponse, GroupValueWrite

if TYPE_CHECKING:
from xknx.dpt import DPTArray, DPTBinary
from xknx.telegram.address import GroupAddressableType
from xknx.xknx import XKNX

AsyncCallback = Callable[[], Awaitable[None]]
DPTPayload = Union[DPTArray, DPTBinary]

logger = logging.getLogger("xknx.log")


class RemoteValue:
class RemoteValue(ABC):
"""Class for managing remote knx value."""

# pylint: disable=too-many-instance-attributes
def __init__(
self,
xknx,
group_address=None,
group_address_state=None,
sync_state=True,
device_name=None,
feature_name=None,
after_update_cb=None,
passive_group_addresses: List[str] = None,
xknx: "XKNX",
group_address: Optional["GroupAddressableType"] = None,
group_address_state: Optional["GroupAddressableType"] = None,
sync_state: bool = True,
device_name: Optional[str] = None,
feature_name: Optional[str] = None,
after_update_cb: Optional["AsyncCallback"] = None,
passive_group_addresses: Optional[List["GroupAddressableType"]] = None,
):
"""Initialize RemoteValue class."""
# pylint: disable=too-many-arguments
self.xknx = xknx
self.passive_group_addresses = RemoteValue.get_passive_group_addresses(
passive_group_addresses
)

if group_address is not None:
group_address = GroupAddress(group_address)
if group_address_state is not None:
group_address_state = GroupAddress(group_address_state)
self.group_address: Optional[GroupAddress] = group_address
self.group_address_state: Optional[GroupAddress] = group_address_state

self.passive_group_addresses: List[
GroupAddress
] = RemoteValue.get_passive_group_addresses(passive_group_addresses)

self.group_address = group_address
self.group_address_state = group_address_state
self.device_name = "Unknown" if device_name is None else device_name
self.feature_name = "Unknown" if feature_name is None else feature_name
self.device_name: str = "Unknown" if device_name is None else device_name
self.feature_name: str = "Unknown" if feature_name is None else feature_name
self.after_update_cb = after_update_cb
self.payload = None
self.payload: Optional["DPTPayload"] = None

if sync_state and self.group_address_state:
self.xknx.state_updater.register_remote_value(
self, tracker_options=sync_state
)

def __del__(self):
def __del__(self) -> None:
"""Destructor. Removing self from StateUpdater if was registered."""
try:
self.xknx.state_updater.unregister_remote_value(self)
except KeyError:
pass

@property
def initialized(self):
def initialized(self) -> bool:
"""Evaluate if remote value is initialized with group address."""
return bool(
self.group_address_state
Expand All @@ -71,47 +91,49 @@ def initialized(self):
)

@property
def readable(self):
def readable(self) -> bool:
"""Evaluate if remote value should be read from bus."""
return bool(self.group_address_state)

@property
def writable(self):
def writable(self) -> bool:
"""Evaluate if remote value has a group_address set."""
return bool(self.group_address)

def has_group_address(self, group_address):
def has_group_address(self, group_address: GroupAddress) -> bool:
"""Test if device has given group address."""

def _internal_addresses():
def _internal_addresses() -> Generator[Optional[GroupAddress], None, None]:
"""Yield all group_addresses."""
yield self.group_address
yield self.group_address_state
yield from self.passive_group_addresses

return group_address in _internal_addresses()

def payload_valid(self, payload):
@abstractmethod
# TODO: typing - remove Optional
def payload_valid(self, payload: Optional["DPTPayload"]) -> bool:
"""Test if telegram payload may be parsed - to be implemented in derived class.."""
# pylint: disable=unused-argument
logger.warning(
"'payload_valid()' not implemented for %s", self.__class__.__name__
)
return True

def from_knx(self, payload):
@abstractmethod
def from_knx(self, payload: "DPTPayload") -> Any:
"""Convert current payload to value - to be implemented in derived class."""
# pylint: disable=unused-argument
logger.warning("'from_knx()' not implemented for %s", self.__class__.__name__)

def to_knx(self, value):
@abstractmethod
def to_knx(self, value: Any) -> "DPTPayload":
"""Convert value to payload - to be implemented in derived class."""
# pylint: disable=unused-argument
logger.warning("'to_knx()' not implemented for %s", self.__class__.__name__)

async def process(self, telegram, always_callback=False):
async def process(self, telegram: Telegram, always_callback: bool = False) -> bool:
"""Process incoming or outgoing telegram."""
if not self.has_group_address(telegram.destination_address):
if not isinstance(
telegram.destination_address, GroupAddress
) or not self.has_group_address(telegram.destination_address):
return False
if not isinstance(
telegram.payload,
Expand All @@ -123,17 +145,17 @@ async def process(self, telegram, always_callback=False):
raise CouldNotParseTelegram(
"payload not a GroupValueWrite or GroupValueResponse",
payload=telegram.payload,
destination_address=telegram.destination_address,
source_address=telegram.source_address,
destination_address=str(telegram.destination_address),
source_address=str(telegram.source_address),
device_name=self.device_name,
feature_name=self.feature_name,
)
if not self.payload_valid(telegram.payload.value):
raise CouldNotParseTelegram(
"payload invalid",
payload=telegram.payload,
destination_address=telegram.destination_address,
source_address=telegram.source_address,
payload=str(telegram.payload),
destination_address=str(telegram.destination_address),
source_address=str(telegram.source_address),
device_name=self.device_name,
feature_name=self.feature_name,
)
Expand All @@ -149,23 +171,26 @@ async def process(self, telegram, always_callback=False):
return True

@property
def value(self):
def value(self) -> Any:
"""Return current value."""
if self.payload is None:
return None
return self.from_knx(self.payload)

async def _send(self, payload, response=False):
async def _send(self, payload: "DPTPayload", response: bool = False) -> None:
"""Send payload as telegram to KNX bus."""
telegram = Telegram(
destination_address=self.group_address,
payload=(
GroupValueResponse(payload) if response else GroupValueWrite(payload)
),
)
await self.xknx.telegrams.put(telegram)
if self.group_address is not None:
telegram = Telegram(
destination_address=self.group_address,
payload=(
GroupValueResponse(payload)
if response
else GroupValueWrite(payload)
),
)
await self.xknx.telegrams.put(telegram)

async def set(self, value, response=False):
async def set(self, value: Any, response: bool = False) -> None:
"""Set new value."""
if not self.initialized:
logger.info(
Expand All @@ -188,14 +213,14 @@ async def set(self, value, response=False):
await self._send(payload, response)
# self.payload is set and after_update_cb() called when the outgoing telegram is processed.

async def respond(self):
async def respond(self) -> None:
"""Send current payload as GroupValueResponse telegram to KNX bus."""
if self.payload is not None:
await self._send(self.payload, response=True)

async def read_state(self, wait_for_result: bool = False) -> None:
"""Send GroupValueRead telegram for state address to KNX bus."""
if self.readable:
if self.group_address_state is not None:
# pylint: disable=import-outside-toplevel
# TODO: send a ReadRequset and start a timeout from here instead of ValueReader
# cancel timeout form process(); delete ValueReader
Expand All @@ -217,11 +242,11 @@ async def read_state(self, wait_for_result: bool = False) -> None:
await value_reader.send_group_read()

@property
def unit_of_measurement(self):
def unit_of_measurement(self) -> Optional[str]:
"""Return the unit of measurement."""
return None

def group_addr_str(self):
def group_addr_str(self) -> str:
"""Return object as readable string."""
return "{}/{}/{}/{}".format(
self.group_address.__repr__(),
Expand All @@ -230,7 +255,7 @@ def group_addr_str(self):
self.value,
)

def __str__(self):
def __str__(self) -> str:
"""Return object as string representation."""
return '<{} device_name="{}" feature_name="{}" {}/>'.format(
self.__class__.__name__,
Expand All @@ -239,7 +264,7 @@ def __str__(self):
self.group_addr_str(),
)

def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
"""Equal operator."""
for key, value in self.__dict__.items():
if key == "after_update_cb":
Expand All @@ -256,7 +281,9 @@ def __eq__(self, other):
return True

@staticmethod
def get_passive_group_addresses(passive_group_addresses: List[str]) -> List:
def get_passive_group_addresses(
passive_group_addresses: Optional[List["GroupAddressableType"]],
) -> List[GroupAddress]:
"""Obtain passive state group addresses."""
if passive_group_addresses is None:
return []
Expand Down
Loading

0 comments on commit 47487e2

Please sign in to comment.