/
connection.py
68 lines (58 loc) · 2.59 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"""Manages a connection to the KNX bus."""
from enum import Enum
from typing import Optional
from .const import DEFAULT_MCAST_PORT
from .gateway_scanner import GatewayScanFilter
class ConnectionType(Enum):
"""Enum class for different types of KNX/IP Connections."""
AUTOMATIC = 0
TUNNELING = 1
ROUTING = 2
class ConnectionConfig:
"""
Connection configuration.
Handles:
* type of connection:
* AUTOMATIC for using GatewayScanner for searching and finding KNX/IP devices in the network.
* TUNNELING connect to a specific KNX/IP tunneling device.
* ROUTING use KNX/IP multicast routing.
* local_ip: Local ip of the interface though which KNXIPInterface should connect.
* gateway_ip: IP of KNX/IP tunneling device.
* gateway_port: Port of KNX/IP tunneling device.
* route_back: the KNXnet/IP Server shall use the IP address and the port number in the IP package
received as the target IP address or port number for the response to the KNXnet/IP Client.
* auto_reconnect: Auto reconnect to KNX/IP tunneling device if connection cannot be established.
* auto_reconnect_wait: Wait n seconds before trying to reconnect to KNX/IP tunneling device.
* scan_filter: For AUTOMATIC connection, limit scan with the given filter
"""
# pylint: disable=too-few-public-methods,too-many-instance-attributes
def __init__(
self,
connection_type: ConnectionType = ConnectionType.AUTOMATIC,
local_ip: Optional[str] = None,
local_port: int = 0,
gateway_ip: Optional[str] = None,
gateway_port: int = DEFAULT_MCAST_PORT,
route_back: bool = False,
auto_reconnect: bool = True,
auto_reconnect_wait: int = 3,
scan_filter: GatewayScanFilter = GatewayScanFilter(),
):
"""Initialize ConnectionConfig class."""
# pylint: disable=too-many-arguments
self.connection_type = connection_type
self.local_ip = local_ip
self.local_port = local_port
self.gateway_ip = gateway_ip
self.gateway_port = gateway_port
self.route_back = route_back
self.auto_reconnect = auto_reconnect
self.auto_reconnect_wait = auto_reconnect_wait
if connection_type == ConnectionType.TUNNELING:
scan_filter.tunnelling = True
elif connection_type == ConnectionType.ROUTING:
scan_filter.routing = True
self.scan_filter = scan_filter
def __eq__(self, other: object) -> bool:
"""Equality for ConnectionConfig class (used in unit tests)."""
return self.__dict__ == other.__dict__