-
-
Notifications
You must be signed in to change notification settings - Fork 96
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
11 changed files
with
3,072 additions
and
41 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
"""Example on how to read mask version and properties from a KNX actor.""" | ||
import asyncio | ||
import sys | ||
from typing import List | ||
|
||
from xknx import XKNX | ||
from xknx.core import PayloadReader | ||
from xknx.telegram import IndividualAddress | ||
from xknx.telegram.apci import ( | ||
DeviceDescriptorRead, | ||
DeviceDescriptorResponse, | ||
PropertyValueRead, | ||
PropertyValueResponse, | ||
) | ||
|
||
|
||
async def main(argv: List[str]): | ||
"""Connect and read information from a KNX device. Requires a System B device.""" | ||
if len(argv) == 2: | ||
address = IndividualAddress(argv[1]) | ||
else: | ||
address = "1.1.1" | ||
|
||
xknx = XKNX() | ||
await xknx.start() | ||
|
||
reader = PayloadReader(xknx, address) | ||
|
||
# Read the mask version of the device (descriptor 0). | ||
payload = await reader.send( | ||
DeviceDescriptorRead(descriptor=0), response_class=DeviceDescriptorResponse | ||
) | ||
if payload is not None: | ||
print("Mask version: %04x" % payload.value) | ||
|
||
# Read the serial number of the device (object 0, property 11). | ||
payload = await reader.send( | ||
PropertyValueRead(object_index=0, property_id=11, count=1, start_index=1), | ||
response_class=PropertyValueResponse, | ||
) | ||
if payload is not None: | ||
print( | ||
"Serial number: {:02x}{:02x}:{:02x}{:02x}{:02x}{:02x}".format( | ||
payload.data[0], | ||
payload.data[1], | ||
payload.data[2], | ||
payload.data[3], | ||
payload.data[4], | ||
payload.data[5], | ||
) | ||
) | ||
|
||
# Check if the device is in programming mode (object 0, property 54). | ||
payload = await reader.send( | ||
PropertyValueRead(object_index=0, property_id=54, count=1, start_index=1), | ||
response_class=PropertyValueResponse, | ||
) | ||
if payload is not None: | ||
print("Programming mode: %s" % ("ON" if payload.data[0] else "OFF")) | ||
|
||
await xknx.stop() | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main(sys.argv)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
"""Example on how to connect to restart a KNX device.""" | ||
import asyncio | ||
import sys | ||
from typing import List | ||
|
||
from xknx import XKNX | ||
from xknx.telegram import IndividualAddress, Telegram | ||
from xknx.telegram.apci import Restart | ||
|
||
|
||
async def main(argv: List[str]): | ||
"""Restart a KNX device.""" | ||
if len(argv) != 2: | ||
print(f"{argv[0]}: missing target address.") | ||
return 1 | ||
|
||
address = IndividualAddress(argv[1]) | ||
|
||
xknx = XKNX() | ||
await xknx.start() | ||
|
||
await xknx.telegrams.put(Telegram(address, payload=Restart())) | ||
await asyncio.sleep(2) | ||
|
||
await xknx.stop() | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main(sys.argv)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
"""Unit test for payload reader.""" | ||
import asyncio | ||
import unittest | ||
from unittest.mock import MagicMock, patch | ||
|
||
from xknx import XKNX | ||
from xknx.core import PayloadReader | ||
from xknx.telegram import IndividualAddress, Telegram, TelegramDirection | ||
from xknx.telegram.apci import MemoryRead, MemoryResponse | ||
|
||
|
||
class TestPayloadReader(unittest.TestCase): | ||
"""Test class for payload reader.""" | ||
|
||
def setUp(self): | ||
"""Set up test class.""" | ||
self.loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(self.loop) | ||
|
||
def tearDown(self): | ||
"""Tear down test class.""" | ||
self.loop.close() | ||
|
||
def create_telegram_queue_mock(self, xknx: XKNX, response_telegram: Telegram): | ||
""" | ||
Create a TelegramQueue mock that returns a specific response telegram. | ||
""" | ||
xknx.telegram_queue = MagicMock() | ||
|
||
def _register_telegram_received_cb(func): | ||
self.loop.create_task(func(response_telegram)) | ||
|
||
xknx.telegram_queue.register_telegram_received_cb.side_effect = ( | ||
_register_telegram_received_cb | ||
) | ||
|
||
def test_payload_reader_send_success(self): | ||
"""Test payload reader: successful send.""" | ||
xknx = XKNX() | ||
|
||
destination_address = IndividualAddress("1.2.3") | ||
request_payload = MemoryRead(0xAABB, 3) | ||
response_payload = MemoryResponse(0xAABB, 3, bytes([0x00, 0x11, 0x33])) | ||
|
||
response_telegram = Telegram( | ||
source_address=destination_address, | ||
direction=TelegramDirection.INCOMING, | ||
payload=response_payload, | ||
) | ||
|
||
self.create_telegram_queue_mock(xknx, response_telegram) | ||
|
||
payload_reader = PayloadReader(xknx, destination_address) | ||
|
||
payload = self.loop.run_until_complete( | ||
payload_reader.send(request_payload, response_class=MemoryResponse) | ||
) | ||
|
||
# Response is received. | ||
self.assertEqual(payload, response_payload) | ||
|
||
@patch("logging.Logger.warning") | ||
def test_payload_reader_send_timeout(self, logger_warning_mock): | ||
"""Test payload reader: timeout while waiting for response.""" | ||
xknx = XKNX() | ||
|
||
destination_address = IndividualAddress("1.2.3") | ||
request_payload = MemoryRead(0xAABB, 3) | ||
|
||
payload_reader = PayloadReader(xknx, destination_address, timeout_in_seconds=0) | ||
|
||
payload = self.loop.run_until_complete( | ||
payload_reader.send(request_payload, response_class=MemoryResponse) | ||
) | ||
|
||
# No response received. | ||
self.assertEqual(payload, None) | ||
# Warning was logged. | ||
logger_warning_mock.assert_called_once_with( | ||
"Error: KNX bus did not respond in time (%s secs) to payload request for: %s", | ||
0.0, | ||
IndividualAddress("1.2.3"), | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.