diff --git a/Makefile b/Makefile index c1637acfb..b8a88d7d5 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ all: @echo "clean -- cleanup working directory" test: - PYTHONPATH="${PYTHONPATH}:/" python3 -m unittest discover -s test -p "*_test.py" -b -v + pytest build: @python3 setup.py sdist diff --git a/test/knxip_tests/connect_response_test.py b/test/knxip_tests/connect_response_test.py index a3ff660c8..82d748861 100644 --- a/test/knxip_tests/connect_response_test.py +++ b/test/knxip_tests/connect_response_test.py @@ -75,3 +75,20 @@ def test_from_knx_wrong_crd2(self): knxipframe = KNXIPFrame(xknx) with self.assertRaises(CouldNotParseKNXIP): knxipframe.from_knx(raw) + + def test_connect_response_connection_error(self): + """Test parsing and streaming connection response KNX/IP packet with error.""" + raw = ((0x06, 0x10, 0x02, 0x06, 0x00, 0x08, 0x00, 0x24)) + xknx = XKNX(loop=self.loop) + knxipframe = KNXIPFrame(xknx) + knxipframe.from_knx(raw) + self.assertTrue(isinstance(knxipframe.body, ConnectResponse)) + self.assertEqual(knxipframe.body.status_code, ErrorCode.E_NO_MORE_CONNECTIONS) + self.assertEqual(knxipframe.body.communication_channel, 0) + + knxipframe2 = KNXIPFrame(xknx) + knxipframe2.init(KNXIPServiceType.CONNECT_RESPONSE) + knxipframe2.body.status_code = ErrorCode.E_NO_MORE_CONNECTIONS + knxipframe2.normalize() + + self.assertEqual(knxipframe2.to_knx(), list(raw)) diff --git a/xknx/knxip/connect_response.py b/xknx/knxip/connect_response.py index 514103c15..1d96e81d6 100644 --- a/xknx/knxip/connect_response.py +++ b/xknx/knxip/connect_response.py @@ -33,8 +33,10 @@ def __init__(self, xknx): def calculated_length(self): """Get length of KNX/IP body.""" - return 2 + HPAI.LENGTH + \ - ConnectResponse.CRD_LENGTH + if self.status_code == ErrorCode.E_NO_ERROR: + return 2 + HPAI.LENGTH + \ + ConnectResponse.CRD_LENGTH + return 2 def from_knx(self, raw): """Parse/deserialize from KNX/IP raw data.""" @@ -52,8 +54,9 @@ def crd_from_knx(crd): self.status_code = ErrorCode(raw[1]) pos = 2 - pos += self.control_endpoint.from_knx(raw[pos:]) - pos += crd_from_knx(raw[pos:]) + if self.status_code == ErrorCode.E_NO_ERROR: + pos += self.control_endpoint.from_knx(raw[pos:]) + pos += crd_from_knx(raw[pos:]) return pos def to_knx(self): @@ -69,8 +72,11 @@ def crd_to_knx(): data = [] data.append(self.communication_channel) data.append(self.status_code.value) - data.extend(self.control_endpoint.to_knx()) - data.extend(crd_to_knx()) + + if self.status_code == ErrorCode.E_NO_ERROR: + data.extend(self.control_endpoint.to_knx()) + data.extend(crd_to_knx()) + return data def __str__(self):