/
tunnelling_request_test.py
95 lines (82 loc) · 2.97 KB
/
tunnelling_request_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""Unit test for KNX/IP TunnellingRequest objects."""
import asyncio
import unittest
from xknx import XKNX
from xknx.dpt import DPTBinary
from xknx.exceptions import CouldNotParseKNXIP
from xknx.knxip import CEMIFrame, CEMIMessageCode, KNXIPFrame, TunnellingRequest
from xknx.telegram import GroupAddress, Telegram
from xknx.telegram.apci import GroupValueWrite
class Test_KNXIP_TunnelingReq(unittest.TestCase):
"""Test class for KNX/IP TunnellingRequest objects."""
# pylint: disable=too-many-public-methods,invalid-name
def setUp(self):
"""Set up test class."""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def tearDown(self):
"""Tear down test class."""
self.loop.close()
def test_connect_request(self):
"""Test parsing and streaming connection tunneling request KNX/IP packet."""
raw = (
0x06,
0x10,
0x04,
0x20,
0x00,
0x15,
0x04,
0x01,
0x17,
0x00,
0x11,
0x00,
0xBC,
0xE0,
0x00,
0x00,
0x48,
0x08,
0x01,
0x00,
0x81,
)
xknx = XKNX()
knxipframe = KNXIPFrame(xknx)
knxipframe.from_knx(raw)
self.assertTrue(isinstance(knxipframe.body, TunnellingRequest))
self.assertEqual(knxipframe.body.communication_channel_id, 1)
self.assertEqual(knxipframe.body.sequence_counter, 23)
self.assertTrue(isinstance(knxipframe.body.cemi, CEMIFrame))
self.assertEqual(
knxipframe.body.cemi.telegram,
Telegram(
destination_address=GroupAddress("9/0/8"),
payload=GroupValueWrite(DPTBinary(1)),
),
)
cemi = CEMIFrame(xknx, code=CEMIMessageCode.L_DATA_REQ)
cemi.telegram = Telegram(
destination_address=GroupAddress("9/0/8"),
payload=GroupValueWrite(DPTBinary(1)),
)
tunnelling_request = TunnellingRequest(
xknx, communication_channel_id=1, sequence_counter=23, cemi=cemi
)
knxipframe2 = KNXIPFrame.init_from_body(tunnelling_request)
self.assertEqual(knxipframe2.to_knx(), list(raw))
def test_from_knx_wrong_header(self):
"""Test parsing and streaming wrong TunnellingRequest (wrong header length byte)."""
raw = (0x06, 0x10, 0x04, 0x20, 0x00, 0x15, 0x03)
xknx = XKNX()
knxipframe = KNXIPFrame(xknx)
with self.assertRaises(CouldNotParseKNXIP):
knxipframe.from_knx(raw)
def test_from_knx_wrong_header2(self):
"""Test parsing and streaming wrong TunnellingRequest (wrong header length)."""
raw = (0x06, 0x10, 0x04, 0x20, 0x00, 0x15, 0x04)
xknx = XKNX()
knxipframe = KNXIPFrame(xknx)
with self.assertRaises(CouldNotParseKNXIP):
knxipframe.from_knx(raw)