Skip to content

Commit

Permalink
Merge a64dd0a into bcbd1a2
Browse files Browse the repository at this point in the history
  • Loading branch information
farmio committed Dec 9, 2020
2 parents bcbd1a2 + a64dd0a commit 1be421f
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 146 deletions.
7 changes: 7 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,10 @@ omit =

[report]
precision = 2
# Regexes for lines to exclude from consideration
exclude_lines =
# Don't complain about type checking imports:
if TYPE_CHECKING:

# Don't complain if tests don't hit defensive assertion code:
raise NotImplementedError
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

### Internals

- refactored timeout handling in GatewayScanner, RequestResponse and ValueReader.
- renamed "PhysicalAddress" to "IndividualAddress"
- Telegram: `group_address` renamed to `destination_address`, to prepare support for other APCI services and add `source_address`
- Farewell Travis CI; Welcome Github Actions!
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ warn_redundant_casts = true
warn_unused_configs = true

# add the modules below once we add typing for them so that we fail the build in the future if someone changes something without updating the typings
[mypy-xknx.telegram.*,xknx.exceptions.*,xknx.devices.travelcalculator]
[mypy-xknx.core.value_reader,xknx.devices.travelcalculator,xknx.exceptions.*,xknx.io.gateway_scanner,xknx.telegram.*,]
strict = true
ignore_errors = false
warn_unreachable = true
Expand Down
43 changes: 25 additions & 18 deletions test/core_tests/value_reader_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Unit test for value reader."""
import asyncio
import unittest
from unittest.mock import patch
from unittest.mock import MagicMock, patch

from xknx import XKNX
from xknx.core import ValueReader
Expand All @@ -21,8 +21,7 @@ def tearDown(self):
"""Tear down test class."""
self.loop.close()

@patch("xknx.core.ValueReader.timeout")
def test_value_reader_read_success(self, timeout_mock):
def test_value_reader_read_success(self):
"""Test value reader: successfull read."""
xknx = XKNX()
test_group_address = GroupAddress("0/0/0")
Expand All @@ -34,23 +33,15 @@ def test_value_reader_read_success(self, timeout_mock):
)

value_reader = ValueReader(xknx, test_group_address)
# Create a task for read() (3.5 compatible)
read_task = asyncio.ensure_future(value_reader.read())
# receive the response
self.loop.run_until_complete(value_reader.telegram_received(response_telegram))
# and yield the result
successfull_read = self.loop.run_until_complete(asyncio.gather(read_task))[0]
successfull_read = self.loop.run_until_complete(value_reader.read())

# GroupValueRead telegram is still in the queue because we are not actually processing it
self.assertEqual(xknx.telegrams.qsize(), 1)
# Callback was removed again
self.assertEqual(xknx.telegram_queue.telegram_received_cbs, [])
# Timeout handle was cancelled (cancelled method requires Python 3.7)
event_has_cancelled = getattr(value_reader.timeout_handle, "cancelled", None)
if callable(event_has_cancelled):
self.assertTrue(value_reader.timeout_handle.cancelled())
# timeout() was never called because there was no timeout
timeout_mock.assert_not_called()
# Telegram was received
self.assertEqual(value_reader.received_telegram, response_telegram)
# Successfull read() returns the telegram
Expand All @@ -60,7 +51,10 @@ def test_value_reader_read_success(self, timeout_mock):
def test_value_reader_read_timeout(self, logger_warning_mock):
"""Test value reader: read timeout."""
xknx = XKNX()
value_reader = ValueReader(xknx, GroupAddress("0/0/0"), timeout_in_seconds=0)
value_reader = ValueReader(xknx, GroupAddress("0/0/0"))
value_reader.response_received_or_timeout.wait = MagicMock(
side_effect=asyncio.TimeoutError()
)

timed_out_read = self.loop.run_until_complete(value_reader.read())

Expand All @@ -69,20 +63,33 @@ def test_value_reader_read_timeout(self, logger_warning_mock):
# Warning was logged
logger_warning_mock.assert_called_once_with(
"Error: KNX bus did not respond in time (%s secs) to GroupValueRead request for: %s",
0,
2.0,
GroupAddress("0/0/0"),
)
# Callback was removed again
self.assertEqual(xknx.telegram_queue.telegram_received_cbs, [])
# Timeout handle was cancelled (cancelled method requires Python 3.7)
event_has_cancelled = getattr(value_reader.timeout_handle, "cancelled", None)
if callable(event_has_cancelled):
self.assertTrue(value_reader.timeout_handle.cancelled())
# No telegram was received
self.assertIsNone(value_reader.received_telegram)
# Unsuccessfull read() returns None
self.assertIsNone(timed_out_read)

def test_value_reader_read_cancelled(self):
"""Test value reader: read cancelled."""
xknx = XKNX()
value_reader = ValueReader(xknx, GroupAddress("0/0/0"))
value_reader.response_received_or_timeout.wait = MagicMock(
side_effect=asyncio.CancelledError()
)
with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(value_reader.read())

# GroupValueRead telegram is still in the queue because we are not actually processing it
self.assertEqual(xknx.telegrams.qsize(), 1)
# Callback was removed again
self.assertEqual(xknx.telegram_queue.telegram_received_cbs, [])
# No telegram was received
self.assertIsNone(value_reader.received_telegram)

def test_value_reader_send_group_read(self):
"""Test value reader: send_group_read."""
xknx = XKNX()
Expand Down
22 changes: 9 additions & 13 deletions test/io_tests/gateway_scanner_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Unit test for KNX/IP gateway scanner."""
import asyncio
import unittest
from unittest.mock import patch
from unittest.mock import MagicMock, patch

from xknx import XKNX
from xknx.io import GatewayScanFilter, GatewayScanner, UDPClient
Expand Down Expand Up @@ -128,11 +128,13 @@ def test_search_response_reception(self):
gateway_scanner = GatewayScanner(xknx)
test_search_response = fake_router_search_response(xknx)
udp_client_mock = unittest.mock.create_autospec(UDPClient)
udp_client_mock.local_addr = ("192.168.42.50", 0, "en1")
udp_client_mock.local_addr = ("192.168.42.50", 0)
udp_client_mock.getsockname.return_value = ("192.168.42.50", 0)

self.assertEqual(gateway_scanner.found_gateways, [])
gateway_scanner._response_rec_callback(test_search_response, udp_client_mock)
gateway_scanner._response_rec_callback(
test_search_response, udp_client_mock, interface="en1"
)
self.assertEqual(
str(gateway_scanner.found_gateways[0]), str(self.gateway_desc_both)
)
Expand All @@ -145,17 +147,11 @@ def test_scan_timeout(self, netifaces_mock):
# No interface shall be found
netifaces_mock.interfaces.return_value = []

gateway_scanner = GatewayScanner(xknx, timeout_in_seconds=0)

timed_out_scan = self.loop.run_until_complete(gateway_scanner.scan())

# Timeout handle was cancelled (cancelled method requires Python 3.7)
event_has_cancelled = getattr(
gateway_scanner._timeout_handle, "cancelled", None
gateway_scanner = GatewayScanner(xknx)
gateway_scanner._response_received_or_timeout.wait = MagicMock(
side_effect=asyncio.TimeoutError()
)
if callable(event_has_cancelled):
self.assertTrue(gateway_scanner._timeout_handle.cancelled())
self.assertTrue(gateway_scanner._response_received_or_timeout.is_set())
timed_out_scan = self.loop.run_until_complete(gateway_scanner.scan())
# Unsuccessfull scan() returns None
self.assertEqual(timed_out_scan, [])

Expand Down
45 changes: 44 additions & 1 deletion test/io_tests/request_response_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
"""Unit test for KNX/IP Disconnect Request/Response."""
import asyncio
import unittest
from unittest.mock import MagicMock, patch

from xknx import XKNX
from xknx.io import RequestResponse, UDPClient
from xknx.knxip import DisconnectResponse
from xknx.knxip import DisconnectResponse, KNXIPBody


class AsyncMock(MagicMock):
"""Async Mock."""

# pylint: disable=invalid-overridden-method
async def __call__(self, *args, **kwargs):
return super().__call__(*args, **kwargs)


class TestConnectResponse(unittest.TestCase):
Expand All @@ -28,3 +37,37 @@ def test_create_knxipframe_err(self):

with self.assertRaises(NotImplementedError):
self.loop.run_until_complete(request_response.start())

@patch("logging.Logger.debug")
@patch("xknx.io.RequestResponse.send_request", new_callable=AsyncMock)
def test_request_response_timeout(self, _send_request_mock, logger_debug_mock):
"""Test RequestResponse: timeout. No callback shall be left."""
xknx = XKNX()
udp_client = UDPClient(xknx, ("192.168.1.1", 0), ("192.168.1.2", 1234))
requ_resp = RequestResponse(xknx, udp_client, KNXIPBody)
requ_resp.response_received_or_timeout.wait = MagicMock(
side_effect=asyncio.TimeoutError()
)
self.loop.run_until_complete(requ_resp.start())
# Debug message was logged
logger_debug_mock.assert_called_once_with(
"Error: KNX bus did not respond in time (%s secs) to request of type '%s'",
1.0,
"RequestResponse",
)
# Callback was removed again
self.assertEqual(udp_client.callbacks, [])

@patch("xknx.io.RequestResponse.send_request", new_callable=AsyncMock)
def test_request_response_cancelled(self, _send_request_mock):
"""Test RequestResponse: task cancelled. No callback shall be left."""
xknx = XKNX()
udp_client = UDPClient(xknx, ("192.168.1.1", 0), ("192.168.1.2", 1234))
requ_resp = RequestResponse(xknx, udp_client, KNXIPBody)
requ_resp.response_received_or_timeout.wait = MagicMock(
side_effect=asyncio.CancelledError()
)
with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(requ_resp.start())
# Callback was removed again
self.assertEqual(udp_client.callbacks, [])
65 changes: 30 additions & 35 deletions xknx/core/value_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,51 +9,64 @@
"""
import asyncio
import logging
from typing import TYPE_CHECKING, Optional

from xknx.telegram import Telegram, TelegramType
from xknx.telegram import GroupAddress, Telegram, TelegramType

if TYPE_CHECKING:
from xknx.xknx import XKNX

logger = logging.getLogger("xknx.log")


class ValueReader:
"""Class for reading the value of a specific KNX group address from KNX bus."""

# pylint: disable=too-many-instance-attributes

def __init__(self, xknx, group_address, timeout_in_seconds=2):
def __init__(
self, xknx: "XKNX", group_address: GroupAddress, timeout_in_seconds: float = 2.0
):
"""Initialize ValueReader class."""
self.xknx = xknx
self.group_address = group_address
self.group_address: GroupAddress = group_address
self.response_received_or_timeout = asyncio.Event()
self.success = False
self.timeout_in_seconds = timeout_in_seconds
self.timeout_handle = None
self.received_telegram = None
self.success: bool = False
self.timeout_in_seconds: float = timeout_in_seconds
self.received_telegram: Optional[Telegram] = None

async def read(self):
async def read(self) -> Optional[Telegram]:
"""Send group read and wait for response."""
cb_obj = self.xknx.telegram_queue.register_telegram_received_cb(
self.telegram_received
)

await self.send_group_read()
await self.start_timeout()
await self.response_received_or_timeout.wait()
await self.stop_timeout()

self.xknx.telegram_queue.unregister_telegram_received_cb(cb_obj)
try:
await asyncio.wait_for(
self.response_received_or_timeout.wait(),
timeout=self.timeout_in_seconds,
)
except asyncio.TimeoutError:
logger.warning(
"Error: KNX bus did not respond in time (%s secs) to GroupValueRead request for: %s",
self.timeout_in_seconds,
self.group_address,
)
finally:
# cleanup to not leave callbacks (for asyncio.CancelledError)
self.xknx.telegram_queue.unregister_telegram_received_cb(cb_obj)

if not self.success:
return None
return self.received_telegram

async def send_group_read(self):
async def send_group_read(self) -> None:
"""Send group read."""
telegram = Telegram(
destination_address=self.group_address, telegramtype=TelegramType.GROUP_READ
)
await self.xknx.telegrams.put(telegram)

async def telegram_received(self, telegram):
async def telegram_received(self, telegram: Telegram) -> None:
"""Test if telegram has correct group address and trigger event."""
if (
telegram.destination_address == self.group_address
Expand All @@ -66,21 +79,3 @@ async def telegram_received(self, telegram):
self.success = True
self.received_telegram = telegram
self.response_received_or_timeout.set()

def timeout(self):
"""Handle timeout for not having received expected group response."""
logger.warning(
"Error: KNX bus did not respond in time (%s secs) to GroupValueRead request for: %s",
self.timeout_in_seconds,
self.group_address,
)
self.response_received_or_timeout.set()

async def start_timeout(self):
"""Start timeout. Register callback for no answer received within timeout."""
loop = asyncio.get_running_loop()
self.timeout_handle = loop.call_later(self.timeout_in_seconds, self.timeout)

async def stop_timeout(self):
"""Stop timeout."""
self.timeout_handle.cancel()
Loading

0 comments on commit 1be421f

Please sign in to comment.