Skip to content

Commit

Permalink
Merge 025f4fd into 5ef5cb6
Browse files Browse the repository at this point in the history
  • Loading branch information
farmio committed Apr 6, 2021
2 parents 5ef5cb6 + 025f4fd commit e26067d
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 43 deletions.
8 changes: 6 additions & 2 deletions changelog.md
Expand Up @@ -2,20 +2,24 @@

## Unreleased changes

## Devices

- Add support for cover lock
- ExposeSensor values can now be read from other xknx devices that share a group address
- Add more information to sensors and binary sensors in the HA integration
- Store last Telegram and decoded value in RemoteValue


### Breaking Changes

- Remove configuration handling from core library (use https://xknx.io/config-converter)
- Drop support for python 3.7

### Internals

- Drop support for python 3.7
- use pytest tests instead of unittest TestCase
- Move RequestResponse and subclasses to xknx.io.request_response.*
- Move ConnectionConfig to xknx.io.connection
- Store last Telegram and decoded value in RemoteValue

## 0.17.5 Add support for unique ids 2021-03-30

Expand Down
123 changes: 85 additions & 38 deletions test/devices_tests/sensor_expose_loop_test.py
@@ -1,10 +1,12 @@
"""Unit test for Sensor and ExposeSensor objects."""
from unittest.mock import AsyncMock, call

import pytest
from xknx import XKNX
from xknx.devices import BinarySensor, ExposeSensor, Sensor
from xknx.dpt import DPTArray, DPTBinary
from xknx.telegram import GroupAddress, Telegram, TelegramDirection
from xknx.telegram.apci import GroupValueWrite
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite


@pytest.mark.asyncio
Expand Down Expand Up @@ -178,45 +180,69 @@ class TestSensorExposeLoop:
@pytest.mark.parametrize("value_type,test_payload,test_value", test_cases)
async def test_array_sensor_loop(self, value_type, test_payload, test_value):
"""Test sensor and expose_sensor with different values."""

xknx = XKNX()
sensor = Sensor(
xknx,
"TestSensor",
group_address_state="1/1/1",
value_type=value_type,
)
xknx.knxip_interface = AsyncMock()
xknx.rate_limit = False
await xknx.telegram_queue.start()

expose = ExposeSensor(
xknx,
"TestExpose",
group_address="2/2/2",
group_address="1/1/1",
value_type=value_type,
)
assert expose.resolve_state() is None
# set a value from expose - HA sends strings for new values
stringified_value = str(test_value)
await expose.set(stringified_value)

incoming_telegram = Telegram(
outgoing_telegram = Telegram(
destination_address=GroupAddress("1/1/1"),
direction=TelegramDirection.INCOMING,
direction=TelegramDirection.OUTGOING,
payload=GroupValueWrite(test_payload),
)
await sensor.process(incoming_telegram)

incoming_value = sensor.resolve_state()
await xknx.telegrams.join()
xknx.knxip_interface.send_telegram.assert_called_with(outgoing_telegram)
if isinstance(test_value, float):
assert round(incoming_value, 4) == test_value
assert round(expose.resolve_state(), 4) == test_value
else:
assert incoming_value == test_value
assert expose.resolve_state() == test_value

# HA sends strings for new values
stringified_value = str(test_value)
await expose.set(stringified_value)
# init sensor after expose is set - with same group address
sensor = Sensor(
xknx,
"TestSensor",
group_address_state="1/1/1",
value_type=value_type,
)
assert sensor.resolve_state() is None

assert xknx.telegrams.qsize() == 1
outgoing_telegram = xknx.telegrams.get_nowait()
assert outgoing_telegram == Telegram(
destination_address=GroupAddress("2/2/2"),
# read sensor state (from expose as it has the same GA)
# wait_for_result so we don't have to await self.xknx.telegrams.join()
await sensor.sync(wait_for_result=True)
read_telegram = Telegram(
destination_address=GroupAddress("1/1/1"),
direction=TelegramDirection.OUTGOING,
payload=GroupValueWrite(test_payload),
payload=GroupValueRead(),
)
response_telegram = Telegram(
destination_address=GroupAddress("1/1/1"),
direction=TelegramDirection.OUTGOING,
payload=GroupValueResponse(test_payload),
)
xknx.knxip_interface.send_telegram.assert_has_calls(
[
call(read_telegram),
call(response_telegram),
]
)
# test if Sensor has successfully read from ExposeSensor
if isinstance(test_value, float):
assert round(sensor.resolve_state(), 4) == test_value
else:
assert sensor.resolve_state() == test_value
assert expose.resolve_state() == sensor.resolve_state()
await xknx.telegram_queue.stop()


@pytest.mark.asyncio
Expand All @@ -232,31 +258,52 @@ class TestBinarySensorExposeLoop:
)
async def test_binary_sensor_loop(self, value_type, test_payload, test_value):
"""Test binary_sensor and expose_sensor with binary values."""

xknx = XKNX()
sensor = BinarySensor(xknx, "TestSensor", group_address_state="1/1/1")
xknx.knxip_interface = AsyncMock()
xknx.rate_limit = False
await xknx.telegram_queue.start()

expose = ExposeSensor(
xknx,
"TestExpose",
group_address="2/2/2",
group_address="1/1/1",
value_type=value_type,
)
assert expose.resolve_state() is None

incoming_telegram = Telegram(
await expose.set(test_value)
await xknx.telegrams.join()
outgoing_telegram = Telegram(
destination_address=GroupAddress("1/1/1"),
direction=TelegramDirection.INCOMING,
direction=TelegramDirection.OUTGOING,
payload=GroupValueWrite(test_payload),
)
await sensor.process(incoming_telegram)
xknx.knxip_interface.send_telegram.assert_called_with(outgoing_telegram)
assert expose.resolve_state() == test_value

incoming_value = sensor.is_on()
assert incoming_value == test_value
bin_sensor = BinarySensor(xknx, "TestSensor", group_address_state="1/1/1")
assert bin_sensor.state is None

await expose.set(test_value)
assert xknx.telegrams.qsize() == 1
outgoing_telegram = xknx.telegrams.get_nowait()
assert outgoing_telegram == Telegram(
destination_address=GroupAddress("2/2/2"),
# read sensor state (from expose as it has the same GA)
# wait_for_result so we don't have to await self.xknx.telegrams.join()
await bin_sensor.sync(wait_for_result=True)
read_telegram = Telegram(
destination_address=GroupAddress("1/1/1"),
direction=TelegramDirection.OUTGOING,
payload=GroupValueWrite(test_payload),
payload=GroupValueRead(),
)
response_telegram = Telegram(
destination_address=GroupAddress("1/1/1"),
direction=TelegramDirection.OUTGOING,
payload=GroupValueResponse(test_payload),
)
xknx.knxip_interface.send_telegram.assert_has_calls(
[
call(read_telegram),
call(response_telegram),
]
)
# test if Sensor has successfully read from ExposeSensor
assert bin_sensor.state == test_value
assert expose.resolve_state() == bin_sensor.state
await xknx.telegram_queue.stop()
4 changes: 1 addition & 3 deletions xknx/core/telegram_queue.py
Expand Up @@ -15,7 +15,6 @@

from xknx.exceptions import CommunicationError, XKNXException
from xknx.telegram import AddressFilter, GroupAddress, Telegram, TelegramDirection
from xknx.telegram.apci import GroupValueWrite

if TYPE_CHECKING:
from xknx.xknx import XKNX
Expand Down Expand Up @@ -175,8 +174,7 @@ async def process_telegram_outgoing(self, telegram: Telegram) -> None:
telegram_logger.debug(telegram)
if self.xknx.knxip_interface is not None:
await self.xknx.knxip_interface.send_telegram(telegram)
if isinstance(telegram.payload, GroupValueWrite):
await self.xknx.devices.process(telegram)
await self.xknx.devices.process(telegram)

for telegram_received_cb in self.telegram_received_cbs:
if telegram_received_cb.is_within_filter(telegram):
Expand Down
1 change: 1 addition & 0 deletions xknx/core/value_reader.py
Expand Up @@ -41,6 +41,7 @@ async def read(self) -> Telegram | None:
cb_obj = self.xknx.telegram_queue.register_telegram_received_cb(
self.telegram_received,
group_addresses=[self.group_address],
match_for_outgoing=True,
)
await self.send_group_read()

Expand Down

0 comments on commit e26067d

Please sign in to comment.