diff --git a/examples/example_tunnel.py b/examples/example_tunnel.py index 979adde32e..919c91c673 100644 --- a/examples/example_tunnel.py +++ b/examples/example_tunnel.py @@ -35,8 +35,7 @@ async def main(): gateway_port=gateway.port, ) - await tunnel.connect_udp() - await tunnel.connect() + await tunnel.start() await tunnel.send_telegram( Telegram( @@ -53,8 +52,7 @@ async def main(): ) await asyncio.sleep(2) - await tunnel.connectionstate() - await tunnel.disconnect() + await tunnel.stop() asyncio.run(main()) diff --git a/test/io_tests/tunnel_test.py b/test/io_tests/tunnel_test.py index 7f93147cac..ab7ba4018e 100644 --- a/test/io_tests/tunnel_test.py +++ b/test/io_tests/tunnel_test.py @@ -32,7 +32,7 @@ def tearDown(self): """Tear down test class.""" self.loop.close() - @patch("xknx.io.Tunnel.send_ack") + @patch("xknx.io.Tunnel._send_tunnelling_ack") def test_tunnel_request_received(self, send_ack_mock): """Test Tunnel for calling send_ack on normal frames.""" # LDataInd GroupValueWrite from 1.1.22 to to 5/1/22 with DPT9 payload 0C 3F @@ -47,7 +47,7 @@ def test_tunnel_request_received(self, send_ack_mock): self.tg_received_mock.assert_called_once_with(telegram) send_ack_mock.assert_called_once_with(0x02, 0x21) - @patch("xknx.io.Tunnel.send_ack") + @patch("xknx.io.Tunnel._send_tunnelling_ack") def test_tunnel_request_received_unsupported_frames(self, send_ack_mock): """Test Tunnel sending ACK for unsupported frames.""" # LDataInd APciPhysAddrRead from 0.0.1 to 0/0/0 broadcast - ETS scan for devices in programming mode diff --git a/xknx/io/connectionstate.py b/xknx/io/connectionstate.py index 403b273aa3..906a259882 100644 --- a/xknx/io/connectionstate.py +++ b/xknx/io/connectionstate.py @@ -1,6 +1,7 @@ """Abstraction to send ConnectonStateRequest and wait for ConnectionStateResponse.""" from xknx.knxip import HPAI, ConnectionStateRequest, ConnectionStateResponse, KNXIPFrame +from .const import CONNECTIONSTATE_REQUEST_TIMEOUT from .request_response import RequestResponse @@ -10,7 +11,12 @@ class ConnectionState(RequestResponse): def __init__(self, xknx, udp_client, communication_channel_id): """Initialize ConnectionState class.""" self.udp_client = udp_client - super().__init__(xknx, self.udp_client, ConnectionStateResponse) + super().__init__( + xknx, + self.udp_client, + ConnectionStateResponse, + timeout_in_seconds=CONNECTIONSTATE_REQUEST_TIMEOUT, + ) self.communication_channel_id = communication_channel_id def create_knxipframe(self) -> KNXIPFrame: diff --git a/xknx/io/const.py b/xknx/io/const.py index 7951cad305..fed3c37e3e 100644 --- a/xknx/io/const.py +++ b/xknx/io/const.py @@ -2,3 +2,7 @@ DEFAULT_MCAST_GRP = "224.0.23.12" DEFAULT_MCAST_PORT = 3671 + +CONNECTION_ALIVE_TIME = 120 +CONNECTIONSTATE_REQUEST_TIMEOUT = 10 +HEARTBEAT_RATE = CONNECTION_ALIVE_TIME - (CONNECTIONSTATE_REQUEST_TIMEOUT * 5) diff --git a/xknx/io/routing.py b/xknx/io/routing.py index 3fbd4aa0dd..8cf054ad92 100644 --- a/xknx/io/routing.py +++ b/xknx/io/routing.py @@ -73,7 +73,9 @@ async def send_knxipframe(self, knxipframe): async def start(self): """Start routing.""" await self.udpclient.connect() + self.xknx.connected.set() async def stop(self): """Stop routing.""" await self.udpclient.stop() + self.xknx.connected.clear() diff --git a/xknx/io/tunnel.py b/xknx/io/tunnel.py index d81fbb9e07..fabdd7684f 100644 --- a/xknx/io/tunnel.py +++ b/xknx/io/tunnel.py @@ -9,6 +9,8 @@ from xknx.exceptions import CommunicationError, XKNXException from xknx.knxip import ( CEMIMessageCode, + DisconnectRequest, + DisconnectResponse, KNXIPFrame, KNXIPServiceType, TunnellingAck, @@ -18,6 +20,7 @@ from .connect import Connect from .connectionstate import ConnectionState +from .const import HEARTBEAT_RATE from .disconnect import Disconnect from .tunnelling import Tunnelling from .udp_client import UDPClient @@ -71,77 +74,124 @@ def init_udp_client(self): ) self.udp_client.register_callback( - self.tunnel_reqest_received, [TunnellingRequest.service_type] + self._request_received, + [KNXIPServiceType.TUNNELLING_REQUEST, KNXIPServiceType.DISCONNECT_REQUEST], ) - def tunnel_reqest_received(self, knxipframe, udp_client): - """Handle incoming tunnel request.""" - # pylint: disable=unused-argument - if knxipframe.header.service_type_ident != KNXIPServiceType.TUNNELLING_REQUEST: - logger.warning("Service not implemented: %s", knxipframe) - else: - self.send_ack( - knxipframe.body.communication_channel_id, - knxipframe.body.sequence_counter, - ) - # Don't handle invalid cemi frames (None) and only handle incoming L_DATA_IND frames. - # Ignore L_DATA_CON confirmation frames. L_DATA_REQ frames should only be outgoing. - if ( - knxipframe.body.cemi is not None - and knxipframe.body.cemi.code is CEMIMessageCode.L_DATA_IND - ): - telegram = knxipframe.body.cemi.telegram - telegram.direction = TelegramDirection.INCOMING - if self.telegram_received_callback is not None: - self.telegram_received_callback(telegram) - - def send_ack(self, communication_channel_id, sequence_counter): - """Send tunnelling ACK after tunnelling request received.""" - ack = TunnellingAck( - self.xknx, - communication_channel_id=communication_channel_id, - sequence_counter=sequence_counter, - ) - self.udp_client.send(KNXIPFrame.init_from_body(ack)) + #################### + # + # CONNECT DISCONNECT + # + #################### async def start(self): - """Start tunneling.""" - await self.connect_udp() - await self.connect() + """Start tunneling. Returns True on success.""" + try: + await self.udp_client.connect() + await self._connect_request() + except (OSError, CommunicationError) as ex: + logger.debug( + "Could not establish connection to KNX/IP interface.", exc_info=True + ) + if self.auto_reconnect: + self._reconnect_task = asyncio.create_task(self._reconnect()) + return False + # close udp client to prevent open file descriptors + await self.udp_client.stop() + raise ex + else: + self._tunnel_established() + return True - async def connect_udp(self): - """Connect udp_client.""" - await self.udp_client.connect() + def _tunnel_established(self): + """Set up interface when the tunnel is ready.""" + self.sequence_number = 0 + self.xknx.connected.set() + self._stop_reconnect() + self.start_heartbeat() + + def _tunnel_lost(self): + """Prepare for reconnection or shutdown when the connection is lost. Callback.""" + self.xknx.connected.clear() + self.stop_heartbeat() + if self.auto_reconnect: + self._reconnect_task = asyncio.create_task(self._reconnect()) + else: + raise CommunicationError("Tunnel connection closed.") + + async def _reconnect(self): + """Reconnect to tunnel device.""" + # only send disconnect request if we ever were connected + if self.communication_channel is not None: + await self._disconnect_request(True) + self.communication_channel = None + await self.udp_client.stop() + await asyncio.sleep(self.auto_reconnect_wait) + if await self.start(): + logger.info("Successfully reconnected to KNX bus.") - async def connect(self): - """Connect/build tunnel.""" + def _stop_reconnect(self): + """Stop reconnect task if running.""" + if self._reconnect_task is not None: + self._reconnect_task.cancel() + self._reconnect_task = None + + async def stop(self): + """Stop tunneling.""" + self.xknx.connected.clear() + self.stop_heartbeat() + self._stop_reconnect() + if self.communication_channel is not None: + await self._disconnect_request(False) + self.communication_channel = None + await self.udp_client.stop() + + #################### + # + # OUTGOING REQUESTS + # + #################### + + async def _connect_request(self) -> bool: + """Connect to tunnelling server. Return True if succeeded.""" connect = Connect(self.xknx, self.udp_client) await connect.start() - if not connect.success: - if self.auto_reconnect: - logger.warning( - "Could not connect to KNX. Retry in %s seconds.", - self.auto_reconnect_wait, - ) - self._reconnect_task = asyncio.create_task(self.schedule_reconnect()) - return - raise CommunicationError( - "Could not establish connection", not self._is_reconnecting + if connect.success: + self.communication_channel = connect.communication_channel + # Use the individual address provided by the tunnelling server + self._src_address = IndividualAddress(connect.identifier) + logger.debug( + "Tunnel established communication_channel=%s, id=%s", + connect.communication_channel, + connect.identifier, ) + return True + raise CommunicationError("Could not establish connection") + + async def _connectionstate_request(self): + """Return state of tunnel. True if tunnel is in good shape.""" + conn_state = ConnectionState( + self.xknx, + self.udp_client, + communication_channel_id=self.communication_channel, + ) + await conn_state.start() + return conn_state.success + + async def _disconnect_request(self, ignore_error=False): + """Disconnect from tunnel device.""" + disconnect = Disconnect( + self.xknx, + self.udp_client, + communication_channel_id=self.communication_channel, + ) + await disconnect.start() + if not disconnect.success and not ignore_error: + raise XKNXException("Could not disconnect channel") logger.debug( - "Tunnel established communication_channel=%s, id=%s", - connect.communication_channel, - connect.identifier, + "Tunnel disconnected (communication_channel: %s)", + self.communication_channel, ) - if self._is_reconnecting: - logger.info("Successfully reconnected to KNX bus.") - self._reconnect_task = None - self._is_reconnecting = False - self.communication_channel = connect.communication_channel - # Use the individual address provided by the tunnelling server - self._src_address = IndividualAddress(connect.identifier) - self.sequence_number = 0 - await self.start_heartbeat() async def send_telegram(self, telegram): """ @@ -158,22 +208,25 @@ async def send_telegram(self, telegram): connection by sending a DISCONNECT_REQUEST frame to the other device’s control endpoint. """ - success = await self._send_telegram_impl(telegram) + success = await self._tunnelling_request(telegram) if not success: logger.debug("Sending of telegram failed. Retrying a second time.") - success = await self._send_telegram_impl(telegram) + success = await self._tunnelling_request(telegram) if not success: logger.debug("Resending telegram failed. Reconnecting to tunnel.") - await self.reconnect() - success = await self._send_telegram_impl(telegram) + # TODO: How to test this? + self._tunnel_lost() + if self._reconnect_task: + await self._reconnect_task + success = await self._tunnelling_request(telegram) if not success: raise CommunicationError( "Resending the telegram repeatedly failed.", False ) - self.increase_sequence_number() + self._increase_sequence_number() - async def _send_telegram_impl(self, telegram): - """Send Telegram to tunnelling device - implementation.""" + async def _tunnelling_request(self, telegram): + """Send Telegram to tunnelling device.""" tunnelling = Tunnelling( self.xknx, self.udp_client, @@ -185,112 +238,96 @@ async def _send_telegram_impl(self, telegram): await tunnelling.start() return tunnelling.success - def increase_sequence_number(self): + def _increase_sequence_number(self): """Increase sequence number.""" self.sequence_number += 1 if self.sequence_number == 256: self.sequence_number = 0 - async def connectionstate(self): - """Return state of tunnel. True if tunnel is in good shape.""" - conn_state = ConnectionState( + #################### + # + # INCOMING REQUESTS + # + #################### + + def _request_received(self, knxipframe, _udp_client): + """Handle incoming requests.""" + # pylint: disable=unused-argument + if knxipframe.header.service_type_ident is KNXIPServiceType.TUNNELLING_REQUEST: + self._tunnelling_request_received(knxipframe.body) + elif ( + knxipframe.header.service_type_ident is KNXIPServiceType.DISCONNECT_REQUEST + ): + self._disconnect_request_received(knxipframe.body) + else: + logger.warning("Service not implemented: %s", knxipframe) + + def _tunnelling_request_received(self, tunneling_request: TunnellingRequest): + """Handle incoming tunnel request.""" + self._send_tunnelling_ack( + tunneling_request.communication_channel_id, + tunneling_request.sequence_counter, + ) + # Don't handle invalid cemi frames (None) and only handle incoming L_DATA_IND frames. + # Ignore L_DATA_CON confirmation frames. L_DATA_REQ frames should only be outgoing. + if ( + tunneling_request.cemi is not None + and tunneling_request.cemi.code is CEMIMessageCode.L_DATA_IND + ): + telegram = tunneling_request.cemi.telegram + telegram.direction = TelegramDirection.INCOMING + if self.telegram_received_callback is not None: + self.telegram_received_callback(telegram) + + def _send_tunnelling_ack(self, communication_channel_id, sequence_counter): + """Send tunnelling ACK after tunnelling request received.""" + ack = TunnellingAck( self.xknx, - self.udp_client, - communication_channel_id=self.communication_channel, + communication_channel_id=communication_channel_id, + sequence_counter=sequence_counter, ) - await conn_state.start() - return conn_state.success + self.udp_client.send(KNXIPFrame.init_from_body(ack)) - async def disconnect(self, ignore_error=False): - """Disconnect from tunnel device.""" - # only send disconnect request if we ever were connected - if self.communication_channel is None: - # close udp client to prevent open file descriptors - await self.udp_client.stop() - return - disconnect = Disconnect( + def _disconnect_request_received(self, disconnect_request: DisconnectRequest): + """Handle incoming disconnect request.""" + logger.warning("Received DisconnectRequest from tunnelling sever.") + disconnect_response = DisconnectResponse( self.xknx, - self.udp_client, communication_channel_id=self.communication_channel, ) - await disconnect.start() - if not disconnect.success and not ignore_error: - raise XKNXException("Could not disconnect channel") - logger.debug( - "Tunnel disconnected (communication_channel: %s)", - self.communication_channel, - ) - # close udp client to prevent open file descriptors - await self.udp_client.stop() - - async def reconnect(self): - """Reconnect to tunnel device.""" - self._is_reconnecting = True - await self.stop_heartbeat() - await self.disconnect(True) - self.init_udp_client() - await self.start() - - async def schedule_reconnect(self): - """Schedule reconnect to KNX.""" - await asyncio.sleep(self.auto_reconnect_wait) - await self.reconnect() + self.udp_client.send(KNXIPFrame.init_from_body(disconnect_response)) + self.communication_channel = None + self._tunnel_lost() - async def stop_reconnect(self): - """Stop reconnect task if running.""" - if self._reconnect_task is not None: - self._reconnect_task.cancel() - self._reconnect_task = None + #################### + # + # HEARTBEAT + # + #################### - async def stop(self): - """Stop tunneling.""" - # XXX: set disconnect ignore_error True here. Is there actually anything - # which can happen if disconnect fails? normally this fails because - # we have no connection... - # await self.disconnect() - await self.stop_heartbeat() - await self.stop_reconnect() - await self.disconnect(True) - - async def start_heartbeat(self): + def start_heartbeat(self): """Start heartbeat for monitoring state of tunnel, as suggested by 03.08.02 KNX Core 5.4.""" self._heartbeat_task = asyncio.create_task(self.do_heartbeat()) - async def stop_heartbeat(self): + def stop_heartbeat(self): """Stop heartbeat task if running.""" if self._heartbeat_task is not None: self._heartbeat_task.cancel() self._heartbeat_task = None async def do_heartbeat(self): - """Heartbeat: Worker 'thread', endless loop for sending heartbeat requests.""" + """Heartbeat: Worker task, endless loop for sending heartbeat requests.""" while True: - await asyncio.sleep(15) - await self.do_heartbeat_impl() - - async def do_heartbeat_impl(self): - """Heartbeat: checking connection state and handling result.""" - connectionsstate = await self.connectionstate() - if connectionsstate: - await self.do_heartbeat_success() - else: - await self.do_heartbeat_failed() + await asyncio.sleep(HEARTBEAT_RATE) + if not await self._connectionstate_request(): + await self._do_heartbeat_failed() - async def do_heartbeat_success(self): - """Heartbeat: handling success.""" - self.number_heartbeat_failed = 0 - - async def do_heartbeat_failed(self): + async def _do_heartbeat_failed(self): """Heartbeat: handling error.""" - self.number_heartbeat_failed = self.number_heartbeat_failed + 1 - if self.number_heartbeat_failed > 3: - if not self._is_reconnecting: - logger.warning("Heartbeat to KNX bus failed. Reconnecting.") - try: - await self.stop_reconnect() - await self.reconnect() - self.number_heartbeat_failed = 0 - await self.stop_heartbeat() - except CommunicationError as exc: - if exc.should_log: - logger.warning(exc) + # first heartbeat failed - try 3 more times before disconnecting. + for _heartbeats_failed in range(3): + if await self._connectionstate_request(): + return + # 3 retries failed + logger.warning("Heartbeat to KNX bus failed. Reconnecting.") + self._tunnel_lost() diff --git a/xknx/io/udp_client.py b/xknx/io/udp_client.py index d9ef021ecf..aec875a28e 100644 --- a/xknx/io/udp_client.py +++ b/xknx/io/udp_client.py @@ -60,13 +60,11 @@ def datagram_received(self, data, addr): def error_received(self, exc): """Handle errors. Callback for error received.""" - if hasattr(self, "xknx"): - logger.warning("Error received: %s", exc) + logger.warning("Error received: %s", exc) def connection_lost(self, exc): """Log error. Callback for connection lost.""" - if hasattr(self, "xknx") and exc is not None: - logger.info("Closing transport.") + logger.debug("Closing transport.") def __init__( self, diff --git a/xknx/xknx.py b/xknx/xknx.py index d06f68d1c0..eff9a526f8 100644 --- a/xknx/xknx.py +++ b/xknx/xknx.py @@ -54,6 +54,7 @@ def __init__( self.state_updater = StateUpdater(self) self.knxip_interface = None self.started = asyncio.Event() + self.connected = asyncio.Event() self.address_format = address_format self.own_address = IndividualAddress(own_address) self.rate_limit = rate_limit