diff --git a/docs/configuration.md b/docs/configuration.md
index 7a859892f..a31ffcf51 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -18,11 +18,11 @@ The configuration file can contain three sections.
- The `connection` section can be used to specify the connection to the KNX interface.
- `auto` for automatic discovery of a KNX interface
- `tunneling` for a UDP unicast connection
- - `local_ip` (required) sets the ip address that is used by xknx
- `gateway_ip` (required) sets the ip address of the KNX tunneling interface
- `gateway_port` (optional) sets the port the KNX tunneling interface is listening on
+ - `local_ip` (optional) sets the ip address that is used by xknx
- `routing` for a UDP multicast connection
- - `local_ip` (required) sets the ip address that is used by xknx
+ - `local_ip` (optional) sets the ip address that is used by xknx
- Within the `groups` sections all devices are defined. For each type of device more then one section might be specified. You need to append numbers or strings to differentiate the entries, as in the example below. The appended number or string must be unique.
How to use
diff --git a/home-assistant-plugin/custom_components/xknx/__init__.py b/home-assistant-plugin/custom_components/xknx/__init__.py
index 30f7509cd..9696d1afa 100644
--- a/home-assistant-plugin/custom_components/xknx/__init__.py
+++ b/home-assistant-plugin/custom_components/xknx/__init__.py
@@ -44,12 +44,12 @@
TUNNELING_SCHEMA = vol.Schema({
vol.Required(CONF_HOST): cv.string,
- vol.Required(CONF_XKNX_LOCAL_IP): cv.string,
vol.Optional(CONF_PORT): cv.port,
+ vol.Optional(CONF_XKNX_LOCAL_IP): cv.string,
})
ROUTING_SCHEMA = vol.Schema({
- vol.Required(CONF_XKNX_LOCAL_IP): cv.string,
+ vol.Optional(CONF_XKNX_LOCAL_IP): cv.string,
})
EXPOSE_SCHEMA = vol.Schema({
diff --git a/test/config_test.py b/test/config_test.py
index 51a84adb7..9719ed9c6 100644
--- a/test/config_test.py
+++ b/test/config_test.py
@@ -64,6 +64,15 @@ def test_config_connection(self):
gateway_port=6000)
),
("""
+ connection:
+ tunneling:
+ gateway_ip: '192.168.1.2'
+ """,
+ ConnectionConfig(
+ connection_type=ConnectionType.TUNNELING,
+ gateway_ip="192.168.1.2")
+ ),
+ ("""
connection:
routing:
local_ip: '192.168.1.2'
@@ -71,6 +80,12 @@ def test_config_connection(self):
ConnectionConfig(
connection_type=ConnectionType.ROUTING,
local_ip="192.168.1.2")
+ ),
+ ("""
+ connection:
+ routing:
+ """,
+ ConnectionConfig(connection_type=ConnectionType.ROUTING)
)
]
for yaml_string, expected_conn in test_configs:
diff --git a/test/io_gateway_scanner_test.py b/test/io_gateway_scanner_test.py
new file mode 100644
index 000000000..669433329
--- /dev/null
+++ b/test/io_gateway_scanner_test.py
@@ -0,0 +1,120 @@
+"""Unit test for KNX/IP gateway scanner."""
+import asyncio
+import unittest
+
+from xknx import XKNX
+from xknx.io import GatewayScanFilter, GatewayScanner, UDPClient
+from xknx.io.gateway_scanner import GatewayDescriptor
+from xknx.knx import PhysicalAddress
+from xknx.knxip import (
+ HPAI, DIBDeviceInformation, DIBServiceFamily, DIBSuppSVCFamilies,
+ KNXIPFrame, KNXIPHeader, KNXIPServiceType, SearchResponse)
+
+
+class TestGatewayScanner(unittest.TestCase):
+ """Test class for xknx/io/GatewayScanner objects."""
+
+ def setUp(self):
+ """Set up test class."""
+ self.loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(self.loop)
+
+ def tearDown(self):
+ """Tear down test class."""
+ self.loop.close()
+
+ def test_gateway_scan_filter_match(self):
+ """Test match function of gateway filter."""
+ # pylint: disable=too-many-locals
+ gateway_1 = GatewayDescriptor(name='KNX-Interface',
+ ip_addr='10.1.1.11',
+ port=3761,
+ local_interface='en1',
+ local_ip='110.1.1.100',
+ supports_tunnelling=True,
+ supports_routing=False)
+ gateway_2 = GatewayDescriptor(name='KNX-Router',
+ ip_addr='10.1.1.12',
+ port=3761,
+ local_interface='en1',
+ local_ip='10.1.1.100',
+ supports_tunnelling=False,
+ supports_routing=True)
+ filter_tunnel = GatewayScanFilter(tunnelling=True)
+ filter_router = GatewayScanFilter(routing=True)
+ filter_name = GatewayScanFilter(name="KNX-Router")
+ filter_no_tunnel = GatewayScanFilter(tunnelling=False)
+
+ self.assertTrue(filter_tunnel.match(gateway_1))
+ self.assertFalse(filter_tunnel.match(gateway_2))
+ self.assertFalse(filter_router.match(gateway_1))
+ self.assertTrue(filter_router.match(gateway_2))
+ self.assertFalse(filter_name.match(gateway_1))
+ self.assertTrue(filter_name.match(gateway_2))
+ self.assertFalse(filter_no_tunnel.match(gateway_1))
+ self.assertTrue(filter_no_tunnel.match(gateway_2))
+
+ def test_search_response_reception(self):
+ """Test function of gateway scanner."""
+ # pylint: disable=protected-access
+ xknx = XKNX(loop=self.loop)
+ gateway_scanner = GatewayScanner(xknx)
+ search_response = fake_router_search_response(xknx)
+ udp_client = unittest.mock.create_autospec(UDPClient)
+ udp_client.local_addr = ("192.168.42.50", 0, "en1")
+ udp_client.getsockname.return_value = ("192.168.42.50", 0)
+ router_gw_descriptor = GatewayDescriptor(name="Gira KNX/IP-Router",
+ ip_addr="192.168.42.10",
+ port=3671,
+ local_interface="en1",
+ local_ip="192.168.42.50",
+ supports_tunnelling=True,
+ supports_routing=True)
+
+ self.assertEqual(gateway_scanner.found_gateways, [])
+ gateway_scanner._response_rec_callback(search_response, udp_client)
+ self.assertEqual(str(gateway_scanner.found_gateways[0]),
+ str(router_gw_descriptor))
+
+
+def fake_router_search_response(xknx: XKNX) -> SearchResponse:
+ """Return the SearchResponse of a KNX/IP Router."""
+ _frame_header = KNXIPHeader(xknx)
+ _frame_header.service_type_ident = KNXIPServiceType.SEARCH_RESPONSE
+ _frame_body = SearchResponse(xknx)
+ _frame_body.control_endpoint = HPAI(ip_addr="192.168.42.10", port=3671)
+
+ _device_information = DIBDeviceInformation()
+ _device_information.name = "Gira KNX/IP-Router"
+ _device_information.serial_number = "11:22:33:44:55:66"
+ _device_information.individual_address = PhysicalAddress("1.1.0")
+ _device_information.mac_address = "01:02:03:04:05:06"
+
+ _svc_families = DIBSuppSVCFamilies()
+ _svc_families.families.append(
+ DIBSuppSVCFamilies.Family(name=DIBServiceFamily.CORE,
+ version=1))
+ _svc_families.families.append(
+ DIBSuppSVCFamilies.Family(name=DIBServiceFamily.DEVICE_MANAGEMENT,
+ version=2))
+ _svc_families.families.append(
+ DIBSuppSVCFamilies.Family(name=DIBServiceFamily.TUNNELING,
+ version=1))
+ _svc_families.families.append(
+ DIBSuppSVCFamilies.Family(name=DIBServiceFamily.ROUTING,
+ version=1))
+ _svc_families.families.append(
+ DIBSuppSVCFamilies.Family(name=DIBServiceFamily.REMOTE_CONFIGURATION_DIAGNOSIS,
+ version=1))
+
+ _frame_body.dibs.append(_device_information)
+ _frame_body.dibs.append(_svc_families)
+ _frame_header.set_length(_frame_body)
+
+ search_response = KNXIPFrame(xknx)
+ search_response.init(KNXIPServiceType.SEARCH_RESPONSE)
+ search_response.header = _frame_header
+ search_response.body = _frame_body
+ search_response.normalize()
+
+ return search_response
diff --git a/test/knxip_search_request_test.py b/test/knxip_search_request_test.py
index 10c38bc66..70db8d61d 100644
--- a/test/knxip_search_request_test.py
+++ b/test/knxip_search_request_test.py
@@ -20,7 +20,7 @@ def tearDown(self):
"""Tear down test class."""
self.loop.close()
- def test_connect_request(self):
+ def test_search_request(self):
"""Test parsing and streaming SearchRequest KNX/IP packet."""
raw = ((0x06, 0x10, 0x02, 0x01, 0x00, 0x0e, 0x08, 0x01,
0xe0, 0x00, 0x17, 0x0c, 0x0e, 0x57))
diff --git a/test/knxip_search_response_test.py b/test/knxip_search_response_test.py
index 377ce0e6b..b4d0df0a6 100644
--- a/test/knxip_search_response_test.py
+++ b/test/knxip_search_response_test.py
@@ -22,7 +22,7 @@ def tearDown(self):
"""Tear down test class."""
self.loop.close()
- def test_connect_request(self):
+ def test_search_response(self):
"""Test parsing and streaming SearchResponse KNX/IP packet."""
raw = ((0x06, 0x10, 0x02, 0x02, 0x00, 0x50, 0x08, 0x01,
0xc0, 0xa8, 0x2a, 0x0a, 0x0e, 0x57, 0x36, 0x01,
diff --git a/test/str_test.py b/test/str_test.py
index 527f709a1..b860ede5e 100644
--- a/test/str_test.py
+++ b/test/str_test.py
@@ -10,6 +10,7 @@
from xknx.exceptions import (
ConversionError, CouldNotParseAddress, CouldNotParseKNXIP,
CouldNotParseTelegram, DeviceIllegalValue)
+from xknx.io.gateway_scanner import GatewayDescriptor
from xknx.knx import (
DPTArray, DPTBinary, GroupAddress, PhysicalAddress, Telegram)
from xknx.knxip import (
@@ -563,3 +564,20 @@ def test_knxip_frame(self):
'\n'
' body="" />')
+
+ #
+ # Gateway Scanner
+ #
+ def test_gateway_descriptor(self):
+ """Test string representation of GatewayDescriptor."""
+ gateway_descriptor = GatewayDescriptor(name='KNX-Interface',
+ ip_addr='192.168.2.3',
+ port=1234,
+ local_interface='en1',
+ local_ip='192.168.2.50',
+ supports_tunnelling=True,
+ supports_routing=False)
+ self.assertEqual(
+ str(gateway_descriptor),
+ ''
+ )
diff --git a/xknx/core/config.py b/xknx/core/config.py
index 47ba695ef..52feacf29 100644
--- a/xknx/core/config.py
+++ b/xknx/core/config.py
@@ -50,32 +50,34 @@ def parse_connection(self, doc):
and hasattr(doc["connection"], '__iter__'):
for conn, prefs in doc["connection"].items():
try:
- if conn.startswith("auto"):
- self._parse_connection_prefs(ConnectionType.AUTOMATIC, prefs)
- elif conn.startswith("tunneling"):
- self._parse_connection_prefs(ConnectionType.TUNNELING, prefs)
- elif conn.startswith("routing"):
- self._parse_connection_prefs(ConnectionType.ROUTING, prefs)
+ if conn == "tunneling":
+ if prefs is None or \
+ "gateway_ip" not in prefs:
+ raise XKNXException("`gateway_ip` is required for tunneling connection.")
+ conn_type = ConnectionType.TUNNELING
+ elif conn == "routing":
+ conn_type = ConnectionType.ROUTING
+ else:
+ conn_type = ConnectionType.AUTOMATIC
+ self._parse_connection_prefs(conn_type, prefs)
except XKNXException as ex:
self.xknx.logger.error("Error while reading config file: Could not parse %s: %s", conn, ex)
+ raise ex
- def _parse_connection_prefs(self, conn_type, prefs):
- conn = ConnectionConfig()
- conn.connection_type = conn_type
+ def _parse_connection_prefs(self, conn_type: ConnectionType, prefs) -> None:
+ connection_config = ConnectionConfig(connection_type=conn_type)
if hasattr(prefs, '__iter__'):
for pref, value in prefs.items():
try:
- if pref.startswith("local_ip"):
- conn.local_ip = value
- elif pref.startswith("gateway_ip"):
- conn.gateway_ip = value
- elif pref.startswith("gateway_port"):
- # don't overwrite default if None
- if value is not None:
- conn.gateway_port = value
+ if pref == "gateway_ip":
+ connection_config.gateway_ip = value
+ elif pref == "gateway_port":
+ connection_config.gateway_port = value
+ elif pref == "local_ip":
+ connection_config.local_ip = value
except XKNXException as ex:
self.xknx.logger.error("Error while reading config file: Could not parse %s: %s", pref, ex)
- self.xknx.connection_config = conn
+ self.xknx.connection_config = connection_config
def parse_groups(self, doc):
"""Parse the group section of xknx.yaml."""
diff --git a/xknx/io/gateway_scanner.py b/xknx/io/gateway_scanner.py
index 6ab951251..0c99f334b 100644
--- a/xknx/io/gateway_scanner.py
+++ b/xknx/io/gateway_scanner.py
@@ -43,7 +43,7 @@ def __init__(self,
def __str__(self):
"""Return object as readable string."""
- return ''.format(
self.name,
self.ip_addr,
self.port,
@@ -77,6 +77,10 @@ def match(self, gateway: GatewayDescriptor) -> bool:
return False
return True
+ def __eq__(self, other):
+ """Equality for GatewayScanFilter class (used in unit tests)."""
+ return self.__dict__ == other.__dict__
+
class GatewayScanner():
"""Class for searching KNX/IP devices."""
diff --git a/xknx/io/knxip_interface.py b/xknx/io/knxip_interface.py
index 770d7b3ef..bc73eb418 100644
--- a/xknx/io/knxip_interface.py
+++ b/xknx/io/knxip_interface.py
@@ -6,9 +6,11 @@
* provides callbacks after having received a telegram from the network.
"""
+import ipaddress
from enum import Enum
from platform import system as get_os_name
+import netifaces
from xknx.exceptions import XKNXException
from .const import DEFAULT_MCAST_PORT
@@ -62,19 +64,16 @@ def __init__(self,
self.gateway_port = gateway_port
self.auto_reconnect = auto_reconnect
self.auto_reconnect_wait = auto_reconnect_wait
- self.scan_filter = scan_filter
self.bind_to_multicast_addr = bind_to_multicast_addr
+ if connection_type == ConnectionType.TUNNELING:
+ scan_filter.tunnelling = True
+ elif connection_type == ConnectionType.ROUTING:
+ scan_filter.routing = True
+ self.scan_filter = scan_filter
def __eq__(self, other):
"""Equality for ConnectionConfig class (used in unit tests)."""
- return self.connection_type == other.connection_type and \
- self.local_ip == other.local_ip and \
- self.gateway_ip == other.gateway_ip and \
- self.gateway_port == other.gateway_port and \
- self.auto_reconnect == other.auto_reconnect and \
- self.auto_reconnect_wait == other.auto_reconnect_wait and \
- self.scan_filter == other.scan_filter and \
- self.bind_to_multicast_addr == other.bind_to_multicast_addr
+ return self.__dict__ == other.__dict__
class KNXIPInterface():
@@ -88,10 +87,8 @@ def __init__(self, xknx, connection_config=ConnectionConfig()):
async def start(self):
"""Start interface. Connecting KNX/IP device with the selected method."""
- if self.connection_config.connection_type == ConnectionType.AUTOMATIC:
- await self.start_automatic(
- self.connection_config.scan_filter)
- elif self.connection_config.connection_type == ConnectionType.ROUTING:
+ if self.connection_config.connection_type == ConnectionType.ROUTING and \
+ self.connection_config.local_ip is not None:
await self.start_routing(
self.connection_config.local_ip,
self.connection_config.bind_to_multicast_addr)
@@ -102,6 +99,8 @@ async def start(self):
self.connection_config.gateway_port,
self.connection_config.auto_reconnect,
self.connection_config.auto_reconnect_wait)
+ else:
+ await self.start_automatic(self.connection_config.scan_filter)
async def start_automatic(self, scan_filter: GatewayScanFilter):
"""Start GatewayScanner and connect to the found device."""
@@ -112,7 +111,8 @@ async def start_automatic(self, scan_filter: GatewayScanFilter):
raise XKNXException("No Gateways found")
gateway = gateways[0]
- if gateway.supports_tunnelling:
+ if gateway.supports_tunnelling and \
+ scan_filter.routing is not True:
await self.start_tunnelling(gateway.local_ip,
gateway.ip_addr,
gateway.port,
@@ -126,7 +126,13 @@ async def start_tunnelling(self, local_ip, gateway_ip, gateway_port,
auto_reconnect, auto_reconnect_wait):
"""Start KNX/IP tunnel."""
# pylint: disable=too-many-arguments
- self.xknx.logger.debug("Starting tunnel to %s:%s from %s", gateway_ip, gateway_port, local_ip)
+ try:
+ ipaddress.IPv4Address(gateway_ip)
+ except ipaddress.AddressValueError as ex:
+ raise XKNXException("Gateway IP address is not a valid IPv4 address.") from ex
+ if local_ip is None:
+ local_ip = self.find_local_ip(gateway_ip=gateway_ip)
+ self.xknx.logger.debug("Starting tunnel from %s to %s:%s", local_ip, gateway_ip, gateway_port)
self.interface = Tunnel(
self.xknx,
self.xknx.own_address,
@@ -162,3 +168,35 @@ def telegram_received(self, telegram):
async def send_telegram(self, telegram):
"""Send telegram to connected device (either Tunneling or Routing)."""
await self.interface.send_telegram(telegram)
+
+ def find_local_ip(self, gateway_ip: str) -> str:
+ """Find local IP address on same subnet as gateway."""
+ def _scan_interfaces(gateway: ipaddress.IPv4Address) -> str:
+ """Return local IP address on same subnet as given gateway."""
+ for interface in netifaces.interfaces():
+ try:
+ af_inet = netifaces.ifaddresses(interface)[netifaces.AF_INET]
+ for link in af_inet:
+ network = ipaddress.IPv4Network((link["addr"],
+ link["netmask"]),
+ strict=False)
+ if gateway in network:
+ self.xknx.logger.debug("Using interface: %s", interface)
+ return link["addr"]
+ except KeyError:
+ self.xknx.logger.debug("Could not find IPv4 address on interface %s", interface)
+ continue
+
+ def _find_default_gateway() -> ipaddress.IPv4Address:
+ """Return IP address of default gateway."""
+ gws = netifaces.gateways()
+ return ipaddress.IPv4Address(gws['default'][netifaces.AF_INET][0])
+
+ gateway = ipaddress.IPv4Address(gateway_ip)
+ local_ip = _scan_interfaces(gateway)
+ if local_ip is None:
+ self.xknx.logger.debug(
+ "No interface on same subnet as gateway found. Falling back to default gateway.")
+ default_gateway = _find_default_gateway()
+ local_ip = _scan_interfaces(default_gateway)
+ return local_ip