-
-
Notifications
You must be signed in to change notification settings - Fork 96
/
request_response.py
92 lines (77 loc) · 3.53 KB
/
request_response.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""
Base class for sending a specific type of KNX/IP Packet to a KNX/IP device and wait for the corresponding answer.
Will report if the corresponding answer was not received.
"""
import asyncio
import logging
from typing import TYPE_CHECKING, Optional, Type
from xknx.io.udp_client import UDPClient
from xknx.knxip import ErrorCode, KNXIPBodyResponse, KNXIPFrame
if TYPE_CHECKING:
from xknx.xknx import XKNX
logger = logging.getLogger("xknx.log")
class RequestResponse:
"""Class for sending a specific type of KNX/IP Packet to a KNX/IP device and wait for the corresponding answer."""
def __init__(
self,
xknx: "XKNX",
udp_client: UDPClient,
awaited_response_class: Type[KNXIPBodyResponse],
timeout_in_seconds: float = 1.0,
):
"""Initialize RequstResponse class."""
self.xknx = xknx
self.udpclient = udp_client
self.awaited_response_class: Type[KNXIPBodyResponse] = awaited_response_class
self.response_received_or_timeout = asyncio.Event()
self.success = False
self.timeout_in_seconds = timeout_in_seconds
self.response_status_code: Optional[ErrorCode] = None
def create_knxipframe(self) -> KNXIPFrame:
"""Create KNX/IP Frame object to be sent to device."""
raise NotImplementedError("create_knxipframe has to be implemented")
async def start(self) -> None:
"""Start. Send request and wait for an answer."""
callb = self.udpclient.register_callback(
self.response_rec_callback, [self.awaited_response_class.service_type]
)
await self.send_request()
try:
await asyncio.wait_for(
self.response_received_or_timeout.wait(),
timeout=self.timeout_in_seconds,
)
except asyncio.TimeoutError:
logger.debug(
"Error: KNX bus did not respond in time (%s secs) to request of type '%s'",
self.timeout_in_seconds,
self.__class__.__name__,
)
finally:
# cleanup to not leave callbacks (for asyncio.CancelledError)
self.udpclient.unregister_callback(callb)
async def send_request(self) -> None:
"""Build knxipframe (within derived class) and send via UDP."""
self.udpclient.send(self.create_knxipframe())
def response_rec_callback(self, knxipframe: KNXIPFrame, _: UDPClient) -> None:
"""Verify and handle knxipframe. Callback from internal udpclient."""
if not isinstance(knxipframe.body, self.awaited_response_class):
logger.warning("Could not understand knxipframe")
return
self.response_status_code = knxipframe.body.status_code
self.response_received_or_timeout.set()
if knxipframe.body.status_code == ErrorCode.E_NO_ERROR:
self.success = True
self.on_success_hook(knxipframe)
else:
self.on_error_hook(knxipframe)
def on_success_hook(self, knxipframe: KNXIPFrame) -> None:
"""Do something after having received a valid answer. May be overwritten in derived class."""
def on_error_hook(self, knxipframe: KNXIPFrame) -> None:
"""Do something after having received error within given time. May be overwritten in derived class."""
logger.debug(
"Error: KNX bus responded to request of type '%s' with error in '%s': %s",
self.__class__.__name__,
self.awaited_response_class.__name__,
knxipframe.body.status_code, # type: ignore
)