Skip to content

Commit

Permalink
connect/disconnect instead of start/stop and reconnect when sending t…
Browse files Browse the repository at this point in the history
…elegram fails
  • Loading branch information
farmio committed Dec 21, 2020
1 parent 255e7ea commit de66e32
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 20 deletions.
4 changes: 2 additions & 2 deletions examples/example_tunnel.py
Expand Up @@ -35,7 +35,7 @@ async def main():
gateway_port=gateway.port,
)

await tunnel.start()
await tunnel.connect()

await tunnel.send_telegram(
Telegram(
Expand All @@ -52,7 +52,7 @@ async def main():
)
await asyncio.sleep(2)

await tunnel.stop()
await tunnel.disconnect()


asyncio.run(main())
26 changes: 26 additions & 0 deletions xknx/io/interface.py
@@ -0,0 +1,26 @@
"""
Abstract base for a specific KNX/IP connection (Tunneling or Routing).
* It handles connection and disconnections
* It starts and stops a udp client
* It packs Telegrams into KNX Frames and passes them to a udp client
"""
from abc import ABC, abstractmethod

from xknx.telegram import Telegram


class Interface(ABC):
"""Abstract base class for KNX/IP connections."""

@abstractmethod
async def connect(self) -> bool:
"""Connect to KNX bus. Returns True on success."""

@abstractmethod
async def disconnect(self) -> None:
"""Disconnect from KNX bus."""

@abstractmethod
async def send_telegram(self, telegram: Telegram) -> None:
"""Send Telegram to KNX bus."""
6 changes: 3 additions & 3 deletions xknx/io/knxip_interface.py
Expand Up @@ -150,19 +150,19 @@ async def start_tunnelling(
auto_reconnect=auto_reconnect,
auto_reconnect_wait=auto_reconnect_wait,
)
await self.interface.start()
await self.interface.connect()

async def start_routing(self, local_ip: str) -> None:
"""Start KNX/IP Routing."""
validate_ip(local_ip, address_name="Local IP address")
logger.debug("Starting Routing from %s as %s", local_ip, self.xknx.own_address)
self.interface = Routing(self.xknx, self.telegram_received, local_ip)
await self.interface.start()
await self.interface.connect()

async def stop(self) -> None:
"""Stop connected interfae (either Tunneling or Routing)."""
if self.interface is not None:
await self.interface.stop()
await self.interface.disconnect()
self.interface = None

def telegram_received(self, telegram):
Expand Down
20 changes: 16 additions & 4 deletions xknx/io/routing.py
Expand Up @@ -14,12 +14,13 @@
)
from xknx.telegram import TelegramDirection

from .interface import Interface
from .udp_client import UDPClient

logger = logging.getLogger("xknx.log")


class Routing:
class Routing(Interface):
"""Class for handling KNX/IP routing."""

def __init__(self, xknx, telegram_received_callback, local_ip):
Expand Down Expand Up @@ -70,12 +71,23 @@ async def send_knxipframe(self, knxipframe):
"""Send KNXIPFrame to connected routing device."""
self.udpclient.send(knxipframe)

async def start(self):
async def connect(self) -> bool:
"""Start routing."""
await self.udpclient.connect()
try:
await self.udpclient.connect()
except OSError as ex:
logger.debug(
"Could not establish connection to KNX/IP network. %s: %s",
type(ex).__name__,
ex,
)
# close udp client to prevent open file descriptors
await self.udpclient.stop()
raise ex
self.xknx.connected.set()
return True

async def stop(self):
async def disconnect(self):
"""Stop routing."""
await self.udpclient.stop()
self.xknx.connected.clear()
23 changes: 12 additions & 11 deletions xknx/io/tunnel.py
Expand Up @@ -22,13 +22,14 @@
from .connectionstate import ConnectionState
from .const import HEARTBEAT_RATE
from .disconnect import Disconnect
from .interface import Interface
from .tunnelling import Tunnelling
from .udp_client import UDPClient

logger = logging.getLogger("xknx.log")


class Tunnel:
class Tunnel(Interface):
"""Class for handling KNX/IP tunnels."""

# pylint: disable=too-many-instance-attributes
Expand Down Expand Up @@ -84,8 +85,8 @@ def init_udp_client(self):
#
####################

async def start(self):
"""Start tunneling. Returns True on success."""
async def connect(self):
"""Connect to a KNX tunneling interface. Returns True on success."""
try:
await self.udp_client.connect()
await self._connect_request()
Expand All @@ -109,7 +110,7 @@ def _tunnel_established(self):
"""Set up interface when the tunnel is ready."""
self.sequence_number = 0
self.xknx.connected.set()
self._stop_reconnect()
# self._stop_reconnect()
self.start_heartbeat()

def _tunnel_lost(self):
Expand All @@ -129,7 +130,7 @@ async def _reconnect(self):
self.communication_channel = None
await self.udp_client.stop()
await asyncio.sleep(self.auto_reconnect_wait)
if await self.start():
if await self.connect():
logger.info("Successfully reconnected to KNX bus.")

def _stop_reconnect(self):
Expand All @@ -138,8 +139,8 @@ def _stop_reconnect(self):
self._reconnect_task.cancel()
self._reconnect_task = None

async def stop(self):
"""Stop tunneling."""
async def disconnect(self):
"""Disconnect tunneling connection."""
self.xknx.connected.clear()
self.stop_heartbeat()
self._stop_reconnect()
Expand Down Expand Up @@ -217,13 +218,13 @@ async def send_telegram(self, telegram):
if not success:
logger.debug("Resending telegram failed. Reconnecting to tunnel.")
# TODO: How to test this?
self._tunnel_lost()
if self._reconnect_task:
await self._reconnect_task
if self._reconnect_task is None or self._reconnect_task.done():
self._tunnel_lost()
await self.xknx.connected.wait()
success = await self._tunnelling_request(telegram)
if not success:
raise CommunicationError(
"Resending the telegram repeatedly failed.", False
"Resending the telegram repeatedly failed.", True
)
self._increase_sequence_number()

Expand Down

0 comments on commit de66e32

Please sign in to comment.