Skip to content

Commit

Permalink
Merge pull request #533 from basilfx/feature/apci_payload_extended
Browse files Browse the repository at this point in the history
Extend APCI services
  • Loading branch information
farmio committed Jan 1, 2021
2 parents 3ae35d0 + 03e7eca commit d404075
Show file tree
Hide file tree
Showing 11 changed files with 3,072 additions and 41 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- Telegram: `group_address` renamed to `destination_address`, to prepare support for other APCI services and add `source_address`
- Telegram: remove `Telegram.telegramtype` and replace with payload object derived from `xknx.telegram.apci.APCI`.
- CEMIFrame: remove `CEMIFrame.cmd`, which can be derived from `CEMIFrame.payload`.
- APCI: extend APCI services (e.g. `MemoryRead/Write/Response`, `PropertyRead/Write/Response`, etc).
- Farewell Travis CI; Welcome Github Actions!
- StateUpdater allow float values for `register_remote_value(tracker_options)` attribute.
- Handle exceptions from received unsupported or not implemented KNXIP Service Type identifiers
Expand Down
65 changes: 65 additions & 0 deletions examples/example_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Example on how to read mask version and properties from a KNX actor."""
import asyncio
import sys
from typing import List

from xknx import XKNX
from xknx.core import PayloadReader
from xknx.telegram import IndividualAddress
from xknx.telegram.apci import (
DeviceDescriptorRead,
DeviceDescriptorResponse,
PropertyValueRead,
PropertyValueResponse,
)


async def main(argv: List[str]):
"""Connect and read information from a KNX device. Requires a System B device."""
if len(argv) == 2:
address = IndividualAddress(argv[1])
else:
address = "1.1.1"

xknx = XKNX()
await xknx.start()

reader = PayloadReader(xknx, address)

# Read the mask version of the device (descriptor 0).
payload = await reader.send(
DeviceDescriptorRead(descriptor=0), response_class=DeviceDescriptorResponse
)
if payload is not None:
print("Mask version: %04x" % payload.value)

# Read the serial number of the device (object 0, property 11).
payload = await reader.send(
PropertyValueRead(object_index=0, property_id=11, count=1, start_index=1),
response_class=PropertyValueResponse,
)
if payload is not None:
print(
"Serial number: {:02x}{:02x}:{:02x}{:02x}{:02x}{:02x}".format(
payload.data[0],
payload.data[1],
payload.data[2],
payload.data[3],
payload.data[4],
payload.data[5],
)
)

# Check if the device is in programming mode (object 0, property 54).
payload = await reader.send(
PropertyValueRead(object_index=0, property_id=54, count=1, start_index=1),
response_class=PropertyValueResponse,
)
if payload is not None:
print("Programming mode: %s" % ("ON" if payload.data[0] else "OFF"))

await xknx.stop()


if __name__ == "__main__":
asyncio.run(main(sys.argv))
29 changes: 29 additions & 0 deletions examples/example_restart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Example on how to connect to restart a KNX device."""
import asyncio
import sys
from typing import List

from xknx import XKNX
from xknx.telegram import IndividualAddress, Telegram
from xknx.telegram.apci import Restart


async def main(argv: List[str]):
"""Restart a KNX device."""
if len(argv) != 2:
print(f"{argv[0]}: missing target address.")
return 1

address = IndividualAddress(argv[1])

xknx = XKNX()
await xknx.start()

await xknx.telegrams.put(Telegram(address, payload=Restart()))
await asyncio.sleep(2)

await xknx.stop()


if __name__ == "__main__":
asyncio.run(main(sys.argv))
9 changes: 8 additions & 1 deletion examples/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,11 @@
|[Sensor](./example_sensor.py)|Example for Sensor device|
|[Switch](./example_switch.py)|Example for Switch device|
|[Scene](./example_scene.py)|Example for switching a light on and off|
|[Weather](./example_weather.py)|Example for Weather device and reading sensor data|
|[Weather](./example_weather.py)|Example for Weather device and reading sensor data|

## Low-level

|Example|Description|
|-|-|
|[Info](./example_info.py)|Example on how to read device information such as serial and programming mode (depends on actor if supported)|
|[Restart](./example_restart.py)|Example on how to restart an actor|
83 changes: 83 additions & 0 deletions test/core_tests/payload_reader_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Unit test for payload reader."""
import asyncio
import unittest
from unittest.mock import MagicMock, patch

from xknx import XKNX
from xknx.core import PayloadReader
from xknx.telegram import IndividualAddress, Telegram, TelegramDirection
from xknx.telegram.apci import MemoryRead, MemoryResponse


class TestPayloadReader(unittest.TestCase):
"""Test class for payload reader."""

def setUp(self):
"""Set up test class."""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def tearDown(self):
"""Tear down test class."""
self.loop.close()

def create_telegram_queue_mock(self, xknx: XKNX, response_telegram: Telegram):
"""
Create a TelegramQueue mock that returns a specific response telegram.
"""
xknx.telegram_queue = MagicMock()

def _register_telegram_received_cb(func):
self.loop.create_task(func(response_telegram))

xknx.telegram_queue.register_telegram_received_cb.side_effect = (
_register_telegram_received_cb
)

def test_payload_reader_send_success(self):
"""Test payload reader: successful send."""
xknx = XKNX()

destination_address = IndividualAddress("1.2.3")
request_payload = MemoryRead(0xAABB, 3)
response_payload = MemoryResponse(0xAABB, 3, bytes([0x00, 0x11, 0x33]))

response_telegram = Telegram(
source_address=destination_address,
direction=TelegramDirection.INCOMING,
payload=response_payload,
)

self.create_telegram_queue_mock(xknx, response_telegram)

payload_reader = PayloadReader(xknx, destination_address)

payload = self.loop.run_until_complete(
payload_reader.send(request_payload, response_class=MemoryResponse)
)

# Response is received.
self.assertEqual(payload, response_payload)

@patch("logging.Logger.warning")
def test_payload_reader_send_timeout(self, logger_warning_mock):
"""Test payload reader: timeout while waiting for response."""
xknx = XKNX()

destination_address = IndividualAddress("1.2.3")
request_payload = MemoryRead(0xAABB, 3)

payload_reader = PayloadReader(xknx, destination_address, timeout_in_seconds=0)

payload = self.loop.run_until_complete(
payload_reader.send(request_payload, response_class=MemoryResponse)
)

# No response received.
self.assertEqual(payload, None)
# Warning was logged.
logger_warning_mock.assert_called_once_with(
"Error: KNX bus did not respond in time (%s secs) to payload request for: %s",
0.0,
IndividualAddress("1.2.3"),
)
24 changes: 13 additions & 11 deletions test/io_tests/tunnel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,8 @@ def test_tunnel_request_received(self, send_ack_mock):
send_ack_mock.assert_called_once_with(0x02, 0x21)

@patch("xknx.io.Tunnel._send_tunnelling_ack")
def test_tunnel_request_received_unsupported_frames(self, send_ack_mock):
def test_tunnel_request_received_cemi_too_small(self, send_ack_mock):
"""Test Tunnel sending ACK for unsupported frames."""
# LDataInd APciPhysAddrRead from 0.0.1 to 0/0/0 broadcast - ETS scan for devices in programming mode
# <UnsupportedCEMIMessage description="APCI not supported: 0b0100000000 in CEMI: 2900b0d000010000010100" />
# communication_channel_id: 0x02 sequence_counter: 0x4f
raw = bytes.fromhex("0610 0420 0015 04 02 4f 00 2900b0d000010000010100")

self.tunnel.udp_client.data_received_callback(raw)
self.tg_received_mock.assert_not_called()
send_ack_mock.assert_called_once_with(0x02, 0x4F)
send_ack_mock.reset_mock()

# LDataInd T_Connect from 1.0.250 to 1.0.255 (xknx tunnel endpoint) - ETS Line-Scan
# <UnsupportedCEMIMessage description="CEMI too small. Length: 10; CEMI: 2900b06010fa10ff0080" />
# communication_channel_id: 0x02 sequence_counter: 0x81
Expand All @@ -68,3 +58,15 @@ def test_tunnel_request_received_unsupported_frames(self, send_ack_mock):
self.tunnel.udp_client.data_received_callback(raw)
self.tg_received_mock.assert_not_called()
send_ack_mock.assert_called_once_with(0x02, 0x81)

@patch("xknx.io.Tunnel._send_tunnelling_ack")
def test_tunnel_request_received_apci_unsupported(self, send_ack_mock):
"""Test Tunnel sending ACK for unsupported frames."""
# LDataInd Unsupported Extended APCI from 0.0.1 to 0/0/0 broadcast
# <UnsupportedCEMIMessage description="APCI not supported: 0b1111111000 in CEMI: 2900b0d0000100000103f8" />
# communication_channel_id: 0x02 sequence_counter: 0x4f
raw = bytes.fromhex("0610 0420 0015 04 02 4f 00 2900b0d0000100000103f8")

self.tunnel.udp_client.data_received_callback(raw)
self.tg_received_mock.assert_not_called()
send_ack_mock.assert_called_once_with(0x02, 0x4F)
2 changes: 1 addition & 1 deletion test/knxip_tests/cemi_frame_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_valid_command(frame):


def test_invalid_tpci_apci(frame):
"""Test for invalid APCICommand"""
"""Test for invalid APCIService"""
with raises(UnsupportedCEMIMessage, match=r".*APCI not supported: .*"):
frame.from_knx_data_link_layer(get_data(0x29, 0, 0, 0, 0, 1, 0xFFC0, []))

Expand Down

0 comments on commit d404075

Please sign in to comment.