Skip to content

Commit

Permalink
Merge b3e2298 into cbc0c30
Browse files Browse the repository at this point in the history
  • Loading branch information
farmio committed Dec 21, 2020
2 parents cbc0c30 + b3e2298 commit 0c51936
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 11 deletions.
106 changes: 102 additions & 4 deletions test/knxip_tests/connect_response_test.py
Expand Up @@ -133,19 +133,117 @@ def test_from_knx_wrong_crd2(self):
with self.assertRaises(CouldNotParseKNXIP):
knxipframe.from_knx(raw)

def test_connect_response_connection_error(self):
"""Test parsing and streaming connection response KNX/IP packet with error."""
raw = (0x06, 0x10, 0x02, 0x06, 0x00, 0x08, 0x00, 0x24)
def test_connect_response_connection_error_gira(self):
"""
Test parsing and streaming connection response KNX/IP packet with error e_no_more_connections.
HPAI and CRD normal. This was received from Gira devices (2020).
"""
raw = (
0x06,
0x10,
0x02,
0x06,
0x00,
0x14,
0xC0,
0x24,
0x08,
0x01,
0x0A,
0x01,
0x00,
0x29,
0x0E,
0x57,
0x04,
0x04,
0x00,
0x00,
)
xknx = XKNX()
knxipframe = KNXIPFrame(xknx)
knxipframe.from_knx(raw)
self.assertTrue(isinstance(knxipframe.body, ConnectResponse))
self.assertEqual(knxipframe.body.status_code, ErrorCode.E_NO_MORE_CONNECTIONS)
self.assertEqual(knxipframe.body.communication_channel, 0)
self.assertEqual(knxipframe.body.communication_channel, 192)

knxipframe2 = KNXIPFrame(xknx)
knxipframe2.init(KNXIPServiceType.CONNECT_RESPONSE)
knxipframe2.body.status_code = ErrorCode.E_NO_MORE_CONNECTIONS
knxipframe2.body.communication_channel = 192
knxipframe2.body.request_type = ConnectRequestType.TUNNEL_CONNECTION
knxipframe2.body.control_endpoint = HPAI(ip_addr="10.1.0.41", port=3671)
knxipframe2.body.identifier = 0
knxipframe2.normalize()

self.assertEqual(knxipframe2.to_knx(), list(raw))

def test_connect_response_connection_error_lox(self):
"""
Test parsing and streaming connection response KNX/IP packet with error e_no_more_connections.
HPAI given, CRD all zero. This was received from Loxone device (2020).
"""
raw = (
0x06,
0x10,
0x02,
0x06,
0x00,
0x14,
0x00,
0x24,
0x08,
0x01,
0xC0,
0xA8,
0x01,
0x01,
0x0E,
0x57,
0x00,
0x00,
0x00,
0x00,
)
xknx = XKNX()
knxipframe = KNXIPFrame(xknx)
knxipframe.from_knx(raw)
self.assertTrue(isinstance(knxipframe.body, ConnectResponse))
self.assertEqual(knxipframe.body.status_code, ErrorCode.E_NO_MORE_CONNECTIONS)
self.assertEqual(knxipframe.body.communication_channel, 0)

# no further tests: the current API can't (and shouldn't) create such odd packets

def test_connect_response_connection_error_mdt(self):
"""
Test parsing and streaming connection response KNX/IP packet with error e_no_more_connections.
HPAI and CRD all zero. This was received from MDT device (2020).
"""
raw = (
0x06,
0x10,
0x02,
0x06,
0x00,
0x08,
0x00,
0x24,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
)
xknx = XKNX()
knxipframe = KNXIPFrame(xknx)
knxipframe.from_knx(raw)
self.assertTrue(isinstance(knxipframe.body, ConnectResponse))
self.assertEqual(knxipframe.body.status_code, ErrorCode.E_NO_MORE_CONNECTIONS)
self.assertEqual(knxipframe.body.communication_channel, 0)

# no further tests: the current API can't (and shouldn't) create such odd packets
13 changes: 6 additions & 7 deletions xknx/knxip/connect_response.py
Expand Up @@ -43,9 +43,7 @@ def __init__(

def calculated_length(self):
"""Get length of KNX/IP body."""
if self.status_code == ErrorCode.E_NO_ERROR:
return 2 + HPAI.LENGTH + ConnectResponse.CRD_LENGTH
return 2
return 2 + HPAI.LENGTH + ConnectResponse.CRD_LENGTH

def from_knx(self, raw):
"""Parse/deserialize from KNX/IP raw data."""
Expand All @@ -67,6 +65,9 @@ def crd_from_knx(crd):
if self.status_code == ErrorCode.E_NO_ERROR:
pos += self.control_endpoint.from_knx(raw[pos:])
pos += crd_from_knx(raw[pos:])
else:
# do not parse HPAI and CRD in case of errors - just check length
pos += len(raw[pos:])
return pos

def to_knx(self):
Expand All @@ -84,10 +85,8 @@ def crd_to_knx():
data = []
data.append(self.communication_channel)
data.append(self.status_code.value)

if self.status_code == ErrorCode.E_NO_ERROR:
data.extend(self.control_endpoint.to_knx())
data.extend(crd_to_knx())
data.extend(self.control_endpoint.to_knx())
data.extend(crd_to_knx())

return data

Expand Down

0 comments on commit 0c51936

Please sign in to comment.