From c81a71dc95243bf2d8560739f5df97e748629668 Mon Sep 17 00:00:00 2001 From: buergi Date: Sat, 21 Nov 2020 19:11:01 +0100 Subject: [PATCH 1/7] Added DPTControl classes and RemoteValueControl --- test/dpt_tests/dpt_1byte_control_test.py | 247 ++++++++++++++++++ .../remote_value_control_test.py | 76 ++++++ xknx/dpt/__init__.py | 9 + xknx/dpt/dpt_1byte_control.py | 242 +++++++++++++++++ xknx/remote_value/__init__.py | 1 + xknx/remote_value/remote_value_control.py | 71 +++++ 6 files changed, 646 insertions(+) create mode 100644 test/dpt_tests/dpt_1byte_control_test.py create mode 100644 test/remote_value_tests/remote_value_control_test.py create mode 100644 xknx/dpt/dpt_1byte_control.py create mode 100644 xknx/remote_value/remote_value_control.py diff --git a/test/dpt_tests/dpt_1byte_control_test.py b/test/dpt_tests/dpt_1byte_control_test.py new file mode 100644 index 0000000000..3ce68640b3 --- /dev/null +++ b/test/dpt_tests/dpt_1byte_control_test.py @@ -0,0 +1,247 @@ +"""Unit test for DPTControl objects.""" +import unittest + +from xknx.dpt import ( + DPTControl, + DPTControlStartStop, + DPTControlStartStopBlinds, + DPTControlStartStopDimming, + DPTControlStepwise, +) +from xknx.exceptions import ConversionError + + +class TestDPTControl(unittest.TestCase): + """Test class for DPTControl objects.""" + + def test_to_knx(self): + """Test serializing values to DPTControl.""" + for rawref in range(16): + control = 1 if rawref >> 3 else 0 + raw = DPTControl.to_knx({"control": control, "step_code": rawref & 0x07}) + self.assertEqual(raw, rawref) + + def test_to_knx_inverted(self): + """Test serializing values to DPTControl in inverted mode.""" + for rawref in range(16): + control = 0 if rawref >> 3 else 1 + raw = DPTControl.to_knx( + {"control": control, "step_code": rawref & 0x07}, invert=True + ) + self.assertEqual(raw, rawref) + + def test_to_knx_wrong_type(self): + """Test serializing wrong type to DPTControl.""" + with self.assertRaises(ConversionError): + DPTControl.to_knx("") + with self.assertRaises(ConversionError): + DPTControl.to_knx(0) + + def test_to_knx_wrong_keys(self): + """Test serializing map with missing keys to DPTControl.""" + with self.assertRaises(ConversionError): + DPTControl.to_knx({"control": 0}) + with self.assertRaises(ConversionError): + DPTControl.to_knx({"step_code": 0}) + + def test_to_knx_wrong_value_types(self): + """Test serializing map with keys of invalid type to DPTControl.""" + with self.assertRaises(ConversionError): + DPTControl.to_knx({"control": ""}) + with self.assertRaises(ConversionError): + DPTControl.to_knx({"step_code": ""}) + + def test_to_knx_wrong_values(self): + """Test serializing map with keys of invalid values to DPTControl.""" + with self.assertRaises(ConversionError): + DPTControl.to_knx({"control": -1, "step_code": 0}) + with self.assertRaises(ConversionError): + DPTControl.to_knx({"control": 2, "step_code": 0}) + with self.assertRaises(ConversionError): + DPTControl.to_knx({"control": 0, "step_code": -1}) + with self.assertRaises(ConversionError): + DPTControl.to_knx({"control": 0, "step_code": 0x8}) + + def test_from_knx(self): + """Test parsing DPTControl types from KNX.""" + for raw in range(16): + control = 1 if raw >> 3 else 0 + valueref = {"control": control, "step_code": raw & 0x07} + value = DPTControl.from_knx(raw) + self.assertEqual(value, valueref) + + def test_from_knx_inverted(self): + """Test parsing DPTControl types from KNX.""" + for raw in range(16): + control = 0 if raw >> 3 else 1 + valueref = {"control": control, "step_code": raw & 0x07} + value = DPTControl.from_knx(raw, invert=True) + self.assertEqual(value, valueref) + + def test_from_knx_wrong_value(self): + """Test parsing invalid DPTControl type from KNX.""" + with self.assertRaises(ConversionError): + DPTControl.from_knx(0x1F) + + def test_unit(self): + """Test unit_of_measurement function.""" + self.assertEqual(DPTControl.unit, "") + + +class TestDPTControlStepwise(unittest.TestCase): + """Test class for DPTControlStepwise objects.""" + + def test_to_knx(self): + """Test serializing values to DPTControlStepwise.""" + self.assertEqual(DPTControlStepwise.to_knx(1), 0xF) + self.assertEqual(DPTControlStepwise.to_knx(3), 0xE) + self.assertEqual(DPTControlStepwise.to_knx(6), 0xD) + self.assertEqual(DPTControlStepwise.to_knx(12), 0xC) + self.assertEqual(DPTControlStepwise.to_knx(25), 0xB) + self.assertEqual(DPTControlStepwise.to_knx(50), 0xA) + self.assertEqual(DPTControlStepwise.to_knx(100), 0x9) + self.assertEqual(DPTControlStepwise.to_knx(-1), 0x7) + self.assertEqual(DPTControlStepwise.to_knx(-3), 0x6) + self.assertEqual(DPTControlStepwise.to_knx(-6), 0x5) + self.assertEqual(DPTControlStepwise.to_knx(-12), 0x4) + self.assertEqual(DPTControlStepwise.to_knx(-25), 0x3) + self.assertEqual(DPTControlStepwise.to_knx(-50), 0x2) + self.assertEqual(DPTControlStepwise.to_knx(-100), 0x1) + self.assertEqual(DPTControlStepwise.to_knx(0), 0x0) + + def test_to_knx_wrong_type(self): + """Test serializing wrong type to DPTControlStepwise.""" + with self.assertRaises(ConversionError): + DPTControlStepwise.to_knx("") + + def test_from_knx(self): + """Test parsing DPTControlStepwise types from KNX.""" + self.assertEqual(DPTControlStepwise.from_knx(0xF), 1) + self.assertEqual(DPTControlStepwise.from_knx(0xE), 3) + self.assertEqual(DPTControlStepwise.from_knx(0xD), 6) + self.assertEqual(DPTControlStepwise.from_knx(0xC), 12) + self.assertEqual(DPTControlStepwise.from_knx(0xB), 25) + self.assertEqual(DPTControlStepwise.from_knx(0xA), 50) + self.assertEqual(DPTControlStepwise.from_knx(0x9), 100) + self.assertEqual(DPTControlStepwise.from_knx(0x8), 0) + self.assertEqual(DPTControlStepwise.from_knx(0x7), -1) + self.assertEqual(DPTControlStepwise.from_knx(0x6), -3) + self.assertEqual(DPTControlStepwise.from_knx(0x5), -6) + self.assertEqual(DPTControlStepwise.from_knx(0x4), -12) + self.assertEqual(DPTControlStepwise.from_knx(0x3), -25) + self.assertEqual(DPTControlStepwise.from_knx(0x2), -50) + self.assertEqual(DPTControlStepwise.from_knx(0x1), -100) + self.assertEqual(DPTControlStepwise.from_knx(0x0), 0) + + def test_from_knx_wrong_value(self): + """Test parsing invalid DPTControlStepwise type from KNX.""" + with self.assertRaises(ConversionError): + DPTControlStepwise.from_knx(0x1F) + + def test_unit(self): + """Test unit_of_measurement function.""" + self.assertEqual(DPTControlStepwise.unit, "%") + + +class TestDPTControlStartStop(unittest.TestCase): + """Test class for DPTControlStartStop objects.""" + + def test_mode_to_knx(self): + """Test serializing dimming commands to KNX.""" + self.assertEqual( + DPTControlStartStopDimming.to_knx( + DPTControlStartStopDimming.Direction.INCREASE + ), + 9, + ) + self.assertEqual( + DPTControlStartStopDimming.to_knx( + DPTControlStartStopDimming.Direction.DECREASE + ), + 1, + ) + self.assertEqual( + DPTControlStartStopDimming.to_knx( + DPTControlStartStopDimming.Direction.STOP + ), + 0, + ) + + def test_mode_to_knx_wrong_value(self): + """Test serializing invalid data type to KNX.""" + with self.assertRaises(ConversionError): + DPTControlStartStopDimming.to_knx(1) + + def test_mode_from_knx(self): + """Test parsing dimming commands from KNX.""" + for i in range(16): + if i > 8: + expected_direction = DPTControlStartStopDimming.Direction.INCREASE + elif i in (0, 8): + expected_direction = DPTControlStartStopDimming.Direction.STOP + elif i < 8: + expected_direction = DPTControlStartStopDimming.Direction.DECREASE + self.assertEqual(DPTControlStartStopDimming.from_knx(i), expected_direction) + + def test_mode_from_knx_wrong_value(self): + """Test serializing invalid data type to KNX.""" + with self.assertRaises(ConversionError): + DPTControlStartStopDimming.from_knx((1, 2)) + + def test_direction_names(self): + """Test names of Direction Enum.""" + self.assertEqual(str(DPTControlStartStop.Direction.INCREASE), "Increase") + self.assertEqual(str(DPTControlStartStop.Direction.DECREASE), "Decrease") + self.assertEqual(str(DPTControlStartStop.Direction.STOP), "Stop") + + +class TestDPTControlStartStopDimming(TestDPTControlStartStop): + """Test class for DPTControlStartStopDimming objects.""" + + def test_direction_names(self): + """Test names of Direction Enum.""" + self.assertEqual(str(DPTControlStartStopDimming.Direction.INCREASE), "Increase") + self.assertEqual(str(DPTControlStartStopDimming.Direction.DECREASE), "Decrease") + self.assertEqual(str(DPTControlStartStopDimming.Direction.STOP), "Stop") + + def test_direction_values(self): + """Test values of Direction Enum.""" + # pylint: disable=no-member + self.assertEqual( + DPTControlStartStopDimming.Direction.DECREASE.value, + DPTControlStartStop.Direction.DECREASE.value, + ) + self.assertEqual( + DPTControlStartStopDimming.Direction.INCREASE.value, + DPTControlStartStop.Direction.INCREASE.value, + ) + self.assertEqual( + DPTControlStartStopDimming.Direction.STOP.value, + DPTControlStartStop.Direction.STOP.value, + ) + + +class TestDPTControlStartStopBlinds(unittest.TestCase): + """Test class for DPTControlStartStopBlinds objects.""" + + def test_direction_names(self): + """Test names of Direction Enum.""" + self.assertEqual(str(DPTControlStartStopBlinds.Direction.DOWN), "Down") + self.assertEqual(str(DPTControlStartStopBlinds.Direction.UP), "Up") + self.assertEqual(str(DPTControlStartStopBlinds.Direction.STOP), "Stop") + + def test_direction_values(self): + """Test values of Direction Enum.""" + # pylint: disable=no-member + self.assertEqual( + DPTControlStartStopBlinds.Direction.UP.value, + DPTControlStartStop.Direction.DECREASE.value, + ) + self.assertEqual( + DPTControlStartStopBlinds.Direction.DOWN.value, + DPTControlStartStop.Direction.INCREASE.value, + ) + self.assertEqual( + DPTControlStartStopBlinds.Direction.STOP.value, + DPTControlStartStop.Direction.STOP.value, + ) diff --git a/test/remote_value_tests/remote_value_control_test.py b/test/remote_value_tests/remote_value_control_test.py new file mode 100644 index 0000000000..159e3931da --- /dev/null +++ b/test/remote_value_tests/remote_value_control_test.py @@ -0,0 +1,76 @@ +"""Unit test for RemoteValueControl objects.""" +import asyncio +import unittest + +from xknx import XKNX +from xknx.dpt import DPTArray, DPTBinary +from xknx.exceptions import ConversionError, CouldNotParseTelegram +from xknx.remote_value import RemoteValueControl +from xknx.telegram import GroupAddress, Telegram + + +class TestRemoteValueControl(unittest.TestCase): + """Test class for RemoteValueControl objects.""" + + def setUp(self): + """Set up test class.""" + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + """Tear down test class.""" + self.loop.close() + + def test_wrong_value_type(self): + """Test initializing with wrong value_type.""" + xknx = XKNX() + with self.assertRaises(ConversionError): + RemoteValueControl(xknx, value_type="wrong_value_type") + + def test_valid_payload(self): + """Test valid_payload method.""" + self.assertTrue(DPTBinary(0)) + self.assertTrue(DPTArray([0])) + + def test_set(self): + """Test setting value.""" + xknx = XKNX() + remote_value = RemoteValueControl( + xknx, group_address=GroupAddress("1/2/3"), value_type="stepwise" + ) + self.loop.run_until_complete(asyncio.Task(remote_value.set(25))) + self.assertEqual(xknx.telegrams.qsize(), 1) + telegram = xknx.telegrams.get_nowait() + self.assertEqual( + telegram, Telegram(GroupAddress("1/2/3"), payload=DPTBinary(0xB)) + ) + + def test_process(self): + """Test process telegram.""" + xknx = XKNX() + remote_value = RemoteValueControl( + xknx, group_address=GroupAddress("1/2/3"), value_type="stepwise" + ) + telegram = Telegram(group_address=GroupAddress("1/2/3"), payload=DPTBinary(0xB)) + self.assertEqual(remote_value.value, None) + self.loop.run_until_complete(asyncio.Task(remote_value.process(telegram))) + self.assertEqual(remote_value.value, 25) + + def test_to_process_error(self): + """Test process errornous telegram.""" + xknx = XKNX() + remote_value = RemoteValueControl( + xknx, group_address=GroupAddress("1/2/3"), value_type="stepwise" + ) + with self.assertRaises(CouldNotParseTelegram): + telegram = Telegram( + group_address=GroupAddress("1/2/3"), payload=DPTArray(0x01) + ) + self.loop.run_until_complete(asyncio.Task(remote_value.process(telegram))) + with self.assertRaises(ConversionError): + telegram = Telegram( + group_address=GroupAddress("1/2/3"), payload=DPTBinary(0x10) + ) + self.loop.run_until_complete(asyncio.Task(remote_value.process(telegram))) + # pylint: disable=pointless-statement + remote_value.value diff --git a/xknx/dpt/__init__.py b/xknx/dpt/__init__.py index b5bfff829c..bb886b69b9 100644 --- a/xknx/dpt/__init__.py +++ b/xknx/dpt/__init__.py @@ -6,6 +6,15 @@ """ # flake8: noqa from .dpt import DPTArray, DPTBase, DPTBinary, DPTComparator +from .dpt_1byte_control import ( + DPTControl, + DPTControlStartStop, + DPTControlStartStopBlinds, + DPTControlStartStopDimming, + DPTControlStepwise, + DPTControlStepwiseBlinds, + DPTControlStepwiseDimming, +) from .dpt_1byte_signed import DPTPercentV8, DPTSignedRelativeValue, DPTValue1Count from .dpt_1byte_uint import ( DPTDecimalFactor, diff --git a/xknx/dpt/dpt_1byte_control.py b/xknx/dpt/dpt_1byte_control.py new file mode 100644 index 0000000000..3605d5c6f9 --- /dev/null +++ b/xknx/dpt/dpt_1byte_control.py @@ -0,0 +1,242 @@ +"""Implementation of Basic KNX DPT B1U3 Values (DPT 3.007/3.008). + +Very good source of information for interpretation of the standard is +https://library.e.abb.com/public/78c74aa86d4648b7b9d918485cd4621a/2CDC500051M0203_ApplicationHB_Lighting_EN.pdf#page=34 + +There are two separate dimming modes sharing the same DPT class: + + * Stepwise dimming + The full brightness range is divided into 2^(stepcode-1) intervals. + The value is always rounded to full interval boundary, i.e. 30% +25% = 50%, 50% +25% = 75%, 30% -25% = 25% + + * Start-stop dimming + Dimming is started with -/+100% (0x1/0x9) and keeps dimming until a STOP diagram (0x0/0x8) is received. + +As the same payload in these cases in interpreted completely different it is reasonable to make separate DPT classes. +""" +from enum import Enum + +from xknx.exceptions import ConversionError + +from .dpt import DPTBase + + +class DPTControl(DPTBase): + """Abstraction for KNX B1U3 values (DPT 3.007/3.008).""" + + # APCI (application layer control information) + APCI_CONTROLMASK = 0x08 + APCI_STEPCODEMASK = 0x07 + APCI_MAX_VALUE = APCI_CONTROLMASK | APCI_STEPCODEMASK + + payload_length = 1 + unit = "" + + @classmethod + def _encode(cls, control, step_code): + """Encode control-bit with step-code.""" + value = 1 if control > 0 else 0 + value = (value << 3) | (step_code & cls.APCI_STEPCODEMASK) + return value + + @classmethod + def _decode(cls, value): + """Decode value into control-bit and step-code.""" + control = 1 if (value & cls.APCI_CONTROLMASK) != 0 else 0 + step_code = value & cls.APCI_STEPCODEMASK + return control, step_code + + @classmethod + def _test_boundaries(cls, raw): + """Test if raw KNX data is within defined range for this object.""" + if isinstance(raw, int): + return 0 <= raw <= cls.APCI_MAX_VALUE + return False + + @classmethod + def _test_values(cls, control, step_code): + """Test if input values are valid.""" + if isinstance(control, int) and isinstance(step_code, int): + if control in (0, 1) and 0 <= step_code <= cls.APCI_STEPCODEMASK: + return True + return False + + @classmethod + def to_knx(cls, value, invert=False): + """Serialize to KNX/IP raw data.""" + if not isinstance(value, dict): + raise ConversionError( + "Cant serialize %s; invalid value type" % cls.__name__, value=value + ) + + try: + control = value["control"] + step_code = value["step_code"] + except KeyError: + raise ConversionError( + "Cant serialize %s; invalid keys" % cls.__name__, value=value + ) + + if not cls._test_values(control, step_code): + raise ConversionError( + "Cant serialize %s; invalid values" % cls.__name__, value=value + ) + + if invert: + control = 0 if control > 0 else 1 + + return cls._encode(control, step_code) + + @classmethod + def from_knx(cls, raw, invert=False): + """Parse/deserialize from KNX/IP raw data.""" + if not cls._test_boundaries(raw): + raise ConversionError("Cant parse %s" % cls.__name__, raw=raw) + + control, step_code = cls._decode(raw) + + if invert: + control = 0 if control > 0 else 1 + + return {"control": control, "step_code": step_code} + + +class DPTControlStepwise(DPTControl): + """Abstraction for KNX DPT 3.xxx in stepwise mode with conversion to an incement value.""" + + unit = "%" + + @staticmethod + def _from_increment(value): + """Calculate control bit and stepcode as defined in the KNX standard section 3.3.1 from an increment value.""" + # control bit in KNX standard + # 0: - = decrease/move up + # 1: + = increase/move down + control = 0 if value <= 0 else 1 + + stepcode = ( + 0 # special case = break indication (e.g. stop dimming/moving blinds) + ) + if abs(value) >= 100: + stepcode = 1 + elif abs(value) >= 50: + stepcode = 2 + elif abs(value) >= 25: + stepcode = 3 + elif abs(value) >= 12: + stepcode = 4 + elif abs(value) >= 6: + stepcode = 5 + elif abs(value) >= 3: + stepcode = 6 + elif abs(value) >= 1: + stepcode = 7 + + return {"control": control, "step_code": stepcode} + + @staticmethod + def _to_increment(value): + """Calculate the increment value from the stepcode and control bit as defined in the KNX standard section 3.3.1.""" + # calculated using floor(100/2^((value&0x07)-1)) + inc = [0, 100, 50, 25, 12, 6, 3, 1][value["step_code"] & 0x07] + return inc if value["control"] == 1 else -inc + + @classmethod + def to_knx(cls, value, invert=False): + """Serialize to KNX/IP raw data.""" + if not isinstance(value, int): + raise ConversionError("Cant serialize %s" % cls.__name__, value=value) + + return super().to_knx(cls._from_increment(value), invert) + + @classmethod + def from_knx(cls, raw, invert=False): + """Parse/deserialize from KNX/IP raw data.""" + return cls._to_increment(super().from_knx(raw, invert)) + + +class DPTControlStepwiseDimming(DPTControlStepwise): + """Abstraction for KNX DPT 3.007 / DPT_Control_Dimming in stepwise mode.""" + + +class DPTControlStepwiseBlinds(DPTControlStepwise): + """Abstraction for KNX DPT 3.008 / DPT_Control_Blinds in stepwise mode.""" + + +class TitleEnum(Enum): + """Enum with a descriptive string representation. + + Ensures values are rendered nicely, e.g. in home assistant. + """ + + def __str__(self): + """Return string representation.""" + # pylint: disable=no-member + return self.name.title() + + +class DPTControlStartStop(DPTControl): + """Abstraction for KNX DPT 3.xxx in start/stop mode.""" + + unit = "" + + class Direction(TitleEnum): + """Enum for indicating the direction.""" + + DECREASE = 0 + INCREASE = 1 + STOP = 2 + + @classmethod + def to_knx(cls, value, invert=False): + """Convert value to payload.""" + control = 0 + step_code = 0 + if value == cls.Direction(1): # INCREASE/DOWN + control = 1 + step_code = 1 + elif value == cls.Direction(0): # DECREASE/UP + control = 0 + step_code = 1 + elif value == cls.Direction(2): # STOP + control = 0 + step_code = 0 + else: + raise ConversionError("Cant serialize %s" % cls.__name__, value=value) + + values = {"control": control, "step_code": step_code} + return super().to_knx(values, invert) + + @classmethod + def from_knx(cls, raw, invert=False): + """Convert current payload to value.""" + values = super().from_knx(raw, invert) + if values["step_code"] == 0: + return cls.Direction(2) # STOP + if values["control"] == 0: + return cls.Direction(0) # DECREASE/UP + return cls.Direction(1) # INCREASE/DOWN + + +class DPTControlStartStopDimming(DPTControlStartStop): + """Abstraction for KNX DPT 3.007 / DPT_Control_Dimming in start/stop mode.""" + + # redefining Direction enum ensures proper typing, e.g. + # DPTControlStartStop.Direction.INCREASE != DPTControlStartStopDimming.Direction.INCREASE + class Direction(TitleEnum): + """Enum for indicating the direction.""" + + DECREASE = 0 + INCREASE = 1 + STOP = 2 + + +class DPTControlStartStopBlinds(DPTControlStartStop): + """Abstraction for KNX DPT 3.008 / DPT_Control_Blinds in start/stop mode.""" + + class Direction(TitleEnum): + """Enum for indicating the direction.""" + + UP = 0 + DOWN = 1 + STOP = 2 diff --git a/xknx/remote_value/__init__.py b/xknx/remote_value/__init__.py index 9ea8a0d690..a7d72b8fea 100644 --- a/xknx/remote_value/__init__.py +++ b/xknx/remote_value/__init__.py @@ -9,6 +9,7 @@ ) from .remote_value_color_rgb import RemoteValueColorRGB from .remote_value_color_rgbw import RemoteValueColorRGBW +from .remote_value_control import RemoteValueControl from .remote_value_datetime import RemoteValueDateTime from .remote_value_dpt_2_byte_unsigned import RemoteValueDpt2ByteUnsigned from .remote_value_dpt_value_1_ucount import RemoteValueDptValue1Ucount diff --git a/xknx/remote_value/remote_value_control.py b/xknx/remote_value/remote_value_control.py new file mode 100644 index 0000000000..e490410db0 --- /dev/null +++ b/xknx/remote_value/remote_value_control.py @@ -0,0 +1,71 @@ +""" +Module for managing a control remote value. + +Examples are switching commands with priority control, relative dimming or blinds control commands. +DPT 2.yyy and DPT 3.yyy +""" +from xknx.dpt import ( + DPTBinary, + DPTControl, + DPTControlStartStop, + DPTControlStartStopBlinds, + DPTControlStartStopDimming, + DPTControlStepwise, + DPTControlStepwiseBlinds, + DPTControlStepwiseDimming, +) +from xknx.exceptions import ConversionError + +from .remote_value import RemoteValue + + +class RemoteValueControl(RemoteValue): + """Abstraction for remote value used for controling.""" + + DPTMAP = { + "control": DPTControl, + "startstop": DPTControlStartStop, + "startstop_dimming": DPTControlStartStopDimming, + "startstop_blinds": DPTControlStartStopBlinds, + "stepwise": DPTControlStepwise, + "stepwise_dimming": DPTControlStepwiseDimming, + "stepwise_blinds": DPTControlStepwiseBlinds, + } + + def __init__( + self, + xknx, + group_address=None, + group_address_state=None, + value_type=None, + device_name=None, + after_update_cb=None, + invert=False, + ): + """Initialize control remote value.""" + # pylint: disable=too-many-arguments + super().__init__( + xknx, + group_address, + group_address_state, + device_name=device_name, + after_update_cb=after_update_cb, + ) + self.invert = invert + if value_type not in self.DPTMAP: + raise ConversionError( + "invalid value type", value_type=value_type, device_name=device_name + ) + self.value_type = value_type + + def payload_valid(self, payload): + """Test if telegram payload may be parsed.""" + return isinstance(payload, DPTBinary) + + def to_knx(self, value): + """Convert value to payload.""" + return DPTBinary(self.DPTMAP[self.value_type].to_knx(value, invert=self.invert)) + + def from_knx(self, payload): + """Convert current payload to value.""" + return self.DPTMAP[self.value_type].from_knx(payload.value, invert=self.invert) From f05b2f6c3b9506e865e5d4b0369297ed1828a631 Mon Sep 17 00:00:00 2001 From: buergi Date: Sat, 21 Nov 2020 21:40:07 +0100 Subject: [PATCH 2/7] Update DPT mapping in RemoteValueControl --- xknx/dpt/dpt_1byte_control.py | 19 +++++++++++++- xknx/remote_value/remote_value_control.py | 31 +++++------------------ 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/xknx/dpt/dpt_1byte_control.py b/xknx/dpt/dpt_1byte_control.py index 3605d5c6f9..6ef475060d 100644 --- a/xknx/dpt/dpt_1byte_control.py +++ b/xknx/dpt/dpt_1byte_control.py @@ -29,8 +29,9 @@ class DPTControl(DPTBase): APCI_STEPCODEMASK = 0x07 APCI_MAX_VALUE = APCI_CONTROLMASK | APCI_STEPCODEMASK - payload_length = 1 + value_type = "control" unit = "" + payload_length = 1 @classmethod def _encode(cls, control, step_code): @@ -104,6 +105,9 @@ def from_knx(cls, raw, invert=False): class DPTControlStepwise(DPTControl): """Abstraction for KNX DPT 3.xxx in stepwise mode with conversion to an incement value.""" + dpt_main_number = 3 + dpt_sub_number = None + value_type = "stepwise" unit = "%" @staticmethod @@ -158,10 +162,18 @@ def from_knx(cls, raw, invert=False): class DPTControlStepwiseDimming(DPTControlStepwise): """Abstraction for KNX DPT 3.007 / DPT_Control_Dimming in stepwise mode.""" + dpt_main_number = 3 + dpt_sub_number = 7 + value_type = "stepwise_dimming" + class DPTControlStepwiseBlinds(DPTControlStepwise): """Abstraction for KNX DPT 3.008 / DPT_Control_Blinds in stepwise mode.""" + dpt_main_number = 3 + dpt_sub_number = 8 + value_type = "stepwise_blinds" + class TitleEnum(Enum): """Enum with a descriptive string representation. @@ -178,6 +190,7 @@ def __str__(self): class DPTControlStartStop(DPTControl): """Abstraction for KNX DPT 3.xxx in start/stop mode.""" + value_type = "startstop" unit = "" class Direction(TitleEnum): @@ -221,6 +234,8 @@ def from_knx(cls, raw, invert=False): class DPTControlStartStopDimming(DPTControlStartStop): """Abstraction for KNX DPT 3.007 / DPT_Control_Dimming in start/stop mode.""" + value_type = "startstop_dimming" + # redefining Direction enum ensures proper typing, e.g. # DPTControlStartStop.Direction.INCREASE != DPTControlStartStopDimming.Direction.INCREASE class Direction(TitleEnum): @@ -234,6 +249,8 @@ class Direction(TitleEnum): class DPTControlStartStopBlinds(DPTControlStartStop): """Abstraction for KNX DPT 3.008 / DPT_Control_Blinds in start/stop mode.""" + value_type = "startstop_blinds" + class Direction(TitleEnum): """Enum for indicating the direction.""" diff --git a/xknx/remote_value/remote_value_control.py b/xknx/remote_value/remote_value_control.py index e490410db0..30765b71d4 100644 --- a/xknx/remote_value/remote_value_control.py +++ b/xknx/remote_value/remote_value_control.py @@ -4,16 +4,7 @@ Examples are switching commands with priority control, relative dimming or blinds control commands. DPT 2.yyy and DPT 3.yyy """ -from xknx.dpt import ( - DPTBinary, - DPTControl, - DPTControlStartStop, - DPTControlStartStopBlinds, - DPTControlStartStopDimming, - DPTControlStepwise, - DPTControlStepwiseBlinds, - DPTControlStepwiseDimming, -) +from xknx.dpt import DPTBase, DPTBinary from xknx.exceptions import ConversionError from .remote_value import RemoteValue @@ -22,16 +13,6 @@ class RemoteValueControl(RemoteValue): """Abstraction for remote value used for controling.""" - DPTMAP = { - "control": DPTControl, - "startstop": DPTControlStartStop, - "startstop_dimming": DPTControlStartStopDimming, - "startstop_blinds": DPTControlStartStopBlinds, - "stepwise": DPTControlStepwise, - "stepwise_dimming": DPTControlStepwiseDimming, - "stepwise_blinds": DPTControlStepwiseBlinds, - } - def __init__( self, xknx, @@ -52,11 +33,13 @@ def __init__( after_update_cb=after_update_cb, ) self.invert = invert - if value_type not in self.DPTMAP: + # pylint: disable=too-many-arguments + _dpt_class = DPTBase.parse_transcoder(value_type) + if _dpt_class is None: raise ConversionError( "invalid value type", value_type=value_type, device_name=device_name ) - self.value_type = value_type + self.dpt_class = _dpt_class def payload_valid(self, payload): """Test if telegram payload may be parsed.""" @@ -64,8 +47,8 @@ def payload_valid(self, payload): def to_knx(self, value): """Convert value to payload.""" - return DPTBinary(self.DPTMAP[self.value_type].to_knx(value, invert=self.invert)) + return DPTBinary(self.dpt_class.to_knx(value, invert=self.invert)) def from_knx(self, payload): """Convert current payload to value.""" - return self.DPTMAP[self.value_type].from_knx(payload.value, invert=self.invert) + return self.dpt_class.from_knx(payload.value, invert=self.invert) From dd9eaf301f49b5c31ad92e2076ad1e4cf434dd57 Mon Sep 17 00:00:00 2001 From: buergi Date: Sat, 21 Nov 2020 19:11:01 +0100 Subject: [PATCH 3/7] Added DPTControl classes and RemoteValueControl --- test/dpt_tests/dpt_1byte_control_test.py | 247 ++++++++++++++++++ .../remote_value_control_test.py | 76 ++++++ xknx/dpt/__init__.py | 9 + xknx/dpt/dpt_1byte_control.py | 242 +++++++++++++++++ xknx/remote_value/__init__.py | 1 + xknx/remote_value/remote_value_control.py | 71 +++++ 6 files changed, 646 insertions(+) create mode 100644 test/dpt_tests/dpt_1byte_control_test.py create mode 100644 test/remote_value_tests/remote_value_control_test.py create mode 100644 xknx/dpt/dpt_1byte_control.py create mode 100644 xknx/remote_value/remote_value_control.py diff --git a/test/dpt_tests/dpt_1byte_control_test.py b/test/dpt_tests/dpt_1byte_control_test.py new file mode 100644 index 0000000000..3ce68640b3 --- /dev/null +++ b/test/dpt_tests/dpt_1byte_control_test.py @@ -0,0 +1,247 @@ +"""Unit test for DPTControl objects.""" +import unittest + +from xknx.dpt import ( + DPTControl, + DPTControlStartStop, + DPTControlStartStopBlinds, + DPTControlStartStopDimming, + DPTControlStepwise, +) +from xknx.exceptions import ConversionError + + +class TestDPTControl(unittest.TestCase): + """Test class for DPTControl objects.""" + + def test_to_knx(self): + """Test serializing values to DPTControl.""" + for rawref in range(16): + control = 1 if rawref >> 3 else 0 + raw = DPTControl.to_knx({"control": control, "step_code": rawref & 0x07}) + self.assertEqual(raw, rawref) + + def test_to_knx_inverted(self): + """Test serializing values to DPTControl in inverted mode.""" + for rawref in range(16): + control = 0 if rawref >> 3 else 1 + raw = DPTControl.to_knx( + {"control": control, "step_code": rawref & 0x07}, invert=True + ) + self.assertEqual(raw, rawref) + + def test_to_knx_wrong_type(self): + """Test serializing wrong type to DPTControl.""" + with self.assertRaises(ConversionError): + DPTControl.to_knx("") + with self.assertRaises(ConversionError): + DPTControl.to_knx(0) + + def test_to_knx_wrong_keys(self): + """Test serializing map with missing keys to DPTControl.""" + with self.assertRaises(ConversionError): + DPTControl.to_knx({"control": 0}) + with self.assertRaises(ConversionError): + DPTControl.to_knx({"step_code": 0}) + + def test_to_knx_wrong_value_types(self): + """Test serializing map with keys of invalid type to DPTControl.""" + with self.assertRaises(ConversionError): + DPTControl.to_knx({"control": ""}) + with self.assertRaises(ConversionError): + DPTControl.to_knx({"step_code": ""}) + + def test_to_knx_wrong_values(self): + """Test serializing map with keys of invalid values to DPTControl.""" + with self.assertRaises(ConversionError): + DPTControl.to_knx({"control": -1, "step_code": 0}) + with self.assertRaises(ConversionError): + DPTControl.to_knx({"control": 2, "step_code": 0}) + with self.assertRaises(ConversionError): + DPTControl.to_knx({"control": 0, "step_code": -1}) + with self.assertRaises(ConversionError): + DPTControl.to_knx({"control": 0, "step_code": 0x8}) + + def test_from_knx(self): + """Test parsing DPTControl types from KNX.""" + for raw in range(16): + control = 1 if raw >> 3 else 0 + valueref = {"control": control, "step_code": raw & 0x07} + value = DPTControl.from_knx(raw) + self.assertEqual(value, valueref) + + def test_from_knx_inverted(self): + """Test parsing DPTControl types from KNX.""" + for raw in range(16): + control = 0 if raw >> 3 else 1 + valueref = {"control": control, "step_code": raw & 0x07} + value = DPTControl.from_knx(raw, invert=True) + self.assertEqual(value, valueref) + + def test_from_knx_wrong_value(self): + """Test parsing invalid DPTControl type from KNX.""" + with self.assertRaises(ConversionError): + DPTControl.from_knx(0x1F) + + def test_unit(self): + """Test unit_of_measurement function.""" + self.assertEqual(DPTControl.unit, "") + + +class TestDPTControlStepwise(unittest.TestCase): + """Test class for DPTControlStepwise objects.""" + + def test_to_knx(self): + """Test serializing values to DPTControlStepwise.""" + self.assertEqual(DPTControlStepwise.to_knx(1), 0xF) + self.assertEqual(DPTControlStepwise.to_knx(3), 0xE) + self.assertEqual(DPTControlStepwise.to_knx(6), 0xD) + self.assertEqual(DPTControlStepwise.to_knx(12), 0xC) + self.assertEqual(DPTControlStepwise.to_knx(25), 0xB) + self.assertEqual(DPTControlStepwise.to_knx(50), 0xA) + self.assertEqual(DPTControlStepwise.to_knx(100), 0x9) + self.assertEqual(DPTControlStepwise.to_knx(-1), 0x7) + self.assertEqual(DPTControlStepwise.to_knx(-3), 0x6) + self.assertEqual(DPTControlStepwise.to_knx(-6), 0x5) + self.assertEqual(DPTControlStepwise.to_knx(-12), 0x4) + self.assertEqual(DPTControlStepwise.to_knx(-25), 0x3) + self.assertEqual(DPTControlStepwise.to_knx(-50), 0x2) + self.assertEqual(DPTControlStepwise.to_knx(-100), 0x1) + self.assertEqual(DPTControlStepwise.to_knx(0), 0x0) + + def test_to_knx_wrong_type(self): + """Test serializing wrong type to DPTControlStepwise.""" + with self.assertRaises(ConversionError): + DPTControlStepwise.to_knx("") + + def test_from_knx(self): + """Test parsing DPTControlStepwise types from KNX.""" + self.assertEqual(DPTControlStepwise.from_knx(0xF), 1) + self.assertEqual(DPTControlStepwise.from_knx(0xE), 3) + self.assertEqual(DPTControlStepwise.from_knx(0xD), 6) + self.assertEqual(DPTControlStepwise.from_knx(0xC), 12) + self.assertEqual(DPTControlStepwise.from_knx(0xB), 25) + self.assertEqual(DPTControlStepwise.from_knx(0xA), 50) + self.assertEqual(DPTControlStepwise.from_knx(0x9), 100) + self.assertEqual(DPTControlStepwise.from_knx(0x8), 0) + self.assertEqual(DPTControlStepwise.from_knx(0x7), -1) + self.assertEqual(DPTControlStepwise.from_knx(0x6), -3) + self.assertEqual(DPTControlStepwise.from_knx(0x5), -6) + self.assertEqual(DPTControlStepwise.from_knx(0x4), -12) + self.assertEqual(DPTControlStepwise.from_knx(0x3), -25) + self.assertEqual(DPTControlStepwise.from_knx(0x2), -50) + self.assertEqual(DPTControlStepwise.from_knx(0x1), -100) + self.assertEqual(DPTControlStepwise.from_knx(0x0), 0) + + def test_from_knx_wrong_value(self): + """Test parsing invalid DPTControlStepwise type from KNX.""" + with self.assertRaises(ConversionError): + DPTControlStepwise.from_knx(0x1F) + + def test_unit(self): + """Test unit_of_measurement function.""" + self.assertEqual(DPTControlStepwise.unit, "%") + + +class TestDPTControlStartStop(unittest.TestCase): + """Test class for DPTControlStartStop objects.""" + + def test_mode_to_knx(self): + """Test serializing dimming commands to KNX.""" + self.assertEqual( + DPTControlStartStopDimming.to_knx( + DPTControlStartStopDimming.Direction.INCREASE + ), + 9, + ) + self.assertEqual( + DPTControlStartStopDimming.to_knx( + DPTControlStartStopDimming.Direction.DECREASE + ), + 1, + ) + self.assertEqual( + DPTControlStartStopDimming.to_knx( + DPTControlStartStopDimming.Direction.STOP + ), + 0, + ) + + def test_mode_to_knx_wrong_value(self): + """Test serializing invalid data type to KNX.""" + with self.assertRaises(ConversionError): + DPTControlStartStopDimming.to_knx(1) + + def test_mode_from_knx(self): + """Test parsing dimming commands from KNX.""" + for i in range(16): + if i > 8: + expected_direction = DPTControlStartStopDimming.Direction.INCREASE + elif i in (0, 8): + expected_direction = DPTControlStartStopDimming.Direction.STOP + elif i < 8: + expected_direction = DPTControlStartStopDimming.Direction.DECREASE + self.assertEqual(DPTControlStartStopDimming.from_knx(i), expected_direction) + + def test_mode_from_knx_wrong_value(self): + """Test serializing invalid data type to KNX.""" + with self.assertRaises(ConversionError): + DPTControlStartStopDimming.from_knx((1, 2)) + + def test_direction_names(self): + """Test names of Direction Enum.""" + self.assertEqual(str(DPTControlStartStop.Direction.INCREASE), "Increase") + self.assertEqual(str(DPTControlStartStop.Direction.DECREASE), "Decrease") + self.assertEqual(str(DPTControlStartStop.Direction.STOP), "Stop") + + +class TestDPTControlStartStopDimming(TestDPTControlStartStop): + """Test class for DPTControlStartStopDimming objects.""" + + def test_direction_names(self): + """Test names of Direction Enum.""" + self.assertEqual(str(DPTControlStartStopDimming.Direction.INCREASE), "Increase") + self.assertEqual(str(DPTControlStartStopDimming.Direction.DECREASE), "Decrease") + self.assertEqual(str(DPTControlStartStopDimming.Direction.STOP), "Stop") + + def test_direction_values(self): + """Test values of Direction Enum.""" + # pylint: disable=no-member + self.assertEqual( + DPTControlStartStopDimming.Direction.DECREASE.value, + DPTControlStartStop.Direction.DECREASE.value, + ) + self.assertEqual( + DPTControlStartStopDimming.Direction.INCREASE.value, + DPTControlStartStop.Direction.INCREASE.value, + ) + self.assertEqual( + DPTControlStartStopDimming.Direction.STOP.value, + DPTControlStartStop.Direction.STOP.value, + ) + + +class TestDPTControlStartStopBlinds(unittest.TestCase): + """Test class for DPTControlStartStopBlinds objects.""" + + def test_direction_names(self): + """Test names of Direction Enum.""" + self.assertEqual(str(DPTControlStartStopBlinds.Direction.DOWN), "Down") + self.assertEqual(str(DPTControlStartStopBlinds.Direction.UP), "Up") + self.assertEqual(str(DPTControlStartStopBlinds.Direction.STOP), "Stop") + + def test_direction_values(self): + """Test values of Direction Enum.""" + # pylint: disable=no-member + self.assertEqual( + DPTControlStartStopBlinds.Direction.UP.value, + DPTControlStartStop.Direction.DECREASE.value, + ) + self.assertEqual( + DPTControlStartStopBlinds.Direction.DOWN.value, + DPTControlStartStop.Direction.INCREASE.value, + ) + self.assertEqual( + DPTControlStartStopBlinds.Direction.STOP.value, + DPTControlStartStop.Direction.STOP.value, + ) diff --git a/test/remote_value_tests/remote_value_control_test.py b/test/remote_value_tests/remote_value_control_test.py new file mode 100644 index 0000000000..159e3931da --- /dev/null +++ b/test/remote_value_tests/remote_value_control_test.py @@ -0,0 +1,76 @@ +"""Unit test for RemoteValueControl objects.""" +import asyncio +import unittest + +from xknx import XKNX +from xknx.dpt import DPTArray, DPTBinary +from xknx.exceptions import ConversionError, CouldNotParseTelegram +from xknx.remote_value import RemoteValueControl +from xknx.telegram import GroupAddress, Telegram + + +class TestRemoteValueControl(unittest.TestCase): + """Test class for RemoteValueControl objects.""" + + def setUp(self): + """Set up test class.""" + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + """Tear down test class.""" + self.loop.close() + + def test_wrong_value_type(self): + """Test initializing with wrong value_type.""" + xknx = XKNX() + with self.assertRaises(ConversionError): + RemoteValueControl(xknx, value_type="wrong_value_type") + + def test_valid_payload(self): + """Test valid_payload method.""" + self.assertTrue(DPTBinary(0)) + self.assertTrue(DPTArray([0])) + + def test_set(self): + """Test setting value.""" + xknx = XKNX() + remote_value = RemoteValueControl( + xknx, group_address=GroupAddress("1/2/3"), value_type="stepwise" + ) + self.loop.run_until_complete(asyncio.Task(remote_value.set(25))) + self.assertEqual(xknx.telegrams.qsize(), 1) + telegram = xknx.telegrams.get_nowait() + self.assertEqual( + telegram, Telegram(GroupAddress("1/2/3"), payload=DPTBinary(0xB)) + ) + + def test_process(self): + """Test process telegram.""" + xknx = XKNX() + remote_value = RemoteValueControl( + xknx, group_address=GroupAddress("1/2/3"), value_type="stepwise" + ) + telegram = Telegram(group_address=GroupAddress("1/2/3"), payload=DPTBinary(0xB)) + self.assertEqual(remote_value.value, None) + self.loop.run_until_complete(asyncio.Task(remote_value.process(telegram))) + self.assertEqual(remote_value.value, 25) + + def test_to_process_error(self): + """Test process errornous telegram.""" + xknx = XKNX() + remote_value = RemoteValueControl( + xknx, group_address=GroupAddress("1/2/3"), value_type="stepwise" + ) + with self.assertRaises(CouldNotParseTelegram): + telegram = Telegram( + group_address=GroupAddress("1/2/3"), payload=DPTArray(0x01) + ) + self.loop.run_until_complete(asyncio.Task(remote_value.process(telegram))) + with self.assertRaises(ConversionError): + telegram = Telegram( + group_address=GroupAddress("1/2/3"), payload=DPTBinary(0x10) + ) + self.loop.run_until_complete(asyncio.Task(remote_value.process(telegram))) + # pylint: disable=pointless-statement + remote_value.value diff --git a/xknx/dpt/__init__.py b/xknx/dpt/__init__.py index b5bfff829c..bb886b69b9 100644 --- a/xknx/dpt/__init__.py +++ b/xknx/dpt/__init__.py @@ -6,6 +6,15 @@ """ # flake8: noqa from .dpt import DPTArray, DPTBase, DPTBinary, DPTComparator +from .dpt_1byte_control import ( + DPTControl, + DPTControlStartStop, + DPTControlStartStopBlinds, + DPTControlStartStopDimming, + DPTControlStepwise, + DPTControlStepwiseBlinds, + DPTControlStepwiseDimming, +) from .dpt_1byte_signed import DPTPercentV8, DPTSignedRelativeValue, DPTValue1Count from .dpt_1byte_uint import ( DPTDecimalFactor, diff --git a/xknx/dpt/dpt_1byte_control.py b/xknx/dpt/dpt_1byte_control.py new file mode 100644 index 0000000000..3605d5c6f9 --- /dev/null +++ b/xknx/dpt/dpt_1byte_control.py @@ -0,0 +1,242 @@ +"""Implementation of Basic KNX DPT B1U3 Values (DPT 3.007/3.008). + +Very good source of information for interpretation of the standard is +https://library.e.abb.com/public/78c74aa86d4648b7b9d918485cd4621a/2CDC500051M0203_ApplicationHB_Lighting_EN.pdf#page=34 + +There are two separate dimming modes sharing the same DPT class: + + * Stepwise dimming + The full brightness range is divided into 2^(stepcode-1) intervals. + The value is always rounded to full interval boundary, i.e. 30% +25% = 50%, 50% +25% = 75%, 30% -25% = 25% + + * Start-stop dimming + Dimming is started with -/+100% (0x1/0x9) and keeps dimming until a STOP diagram (0x0/0x8) is received. + +As the same payload in these cases in interpreted completely different it is reasonable to make separate DPT classes. +""" +from enum import Enum + +from xknx.exceptions import ConversionError + +from .dpt import DPTBase + + +class DPTControl(DPTBase): + """Abstraction for KNX B1U3 values (DPT 3.007/3.008).""" + + # APCI (application layer control information) + APCI_CONTROLMASK = 0x08 + APCI_STEPCODEMASK = 0x07 + APCI_MAX_VALUE = APCI_CONTROLMASK | APCI_STEPCODEMASK + + payload_length = 1 + unit = "" + + @classmethod + def _encode(cls, control, step_code): + """Encode control-bit with step-code.""" + value = 1 if control > 0 else 0 + value = (value << 3) | (step_code & cls.APCI_STEPCODEMASK) + return value + + @classmethod + def _decode(cls, value): + """Decode value into control-bit and step-code.""" + control = 1 if (value & cls.APCI_CONTROLMASK) != 0 else 0 + step_code = value & cls.APCI_STEPCODEMASK + return control, step_code + + @classmethod + def _test_boundaries(cls, raw): + """Test if raw KNX data is within defined range for this object.""" + if isinstance(raw, int): + return 0 <= raw <= cls.APCI_MAX_VALUE + return False + + @classmethod + def _test_values(cls, control, step_code): + """Test if input values are valid.""" + if isinstance(control, int) and isinstance(step_code, int): + if control in (0, 1) and 0 <= step_code <= cls.APCI_STEPCODEMASK: + return True + return False + + @classmethod + def to_knx(cls, value, invert=False): + """Serialize to KNX/IP raw data.""" + if not isinstance(value, dict): + raise ConversionError( + "Cant serialize %s; invalid value type" % cls.__name__, value=value + ) + + try: + control = value["control"] + step_code = value["step_code"] + except KeyError: + raise ConversionError( + "Cant serialize %s; invalid keys" % cls.__name__, value=value + ) + + if not cls._test_values(control, step_code): + raise ConversionError( + "Cant serialize %s; invalid values" % cls.__name__, value=value + ) + + if invert: + control = 0 if control > 0 else 1 + + return cls._encode(control, step_code) + + @classmethod + def from_knx(cls, raw, invert=False): + """Parse/deserialize from KNX/IP raw data.""" + if not cls._test_boundaries(raw): + raise ConversionError("Cant parse %s" % cls.__name__, raw=raw) + + control, step_code = cls._decode(raw) + + if invert: + control = 0 if control > 0 else 1 + + return {"control": control, "step_code": step_code} + + +class DPTControlStepwise(DPTControl): + """Abstraction for KNX DPT 3.xxx in stepwise mode with conversion to an incement value.""" + + unit = "%" + + @staticmethod + def _from_increment(value): + """Calculate control bit and stepcode as defined in the KNX standard section 3.3.1 from an increment value.""" + # control bit in KNX standard + # 0: - = decrease/move up + # 1: + = increase/move down + control = 0 if value <= 0 else 1 + + stepcode = ( + 0 # special case = break indication (e.g. stop dimming/moving blinds) + ) + if abs(value) >= 100: + stepcode = 1 + elif abs(value) >= 50: + stepcode = 2 + elif abs(value) >= 25: + stepcode = 3 + elif abs(value) >= 12: + stepcode = 4 + elif abs(value) >= 6: + stepcode = 5 + elif abs(value) >= 3: + stepcode = 6 + elif abs(value) >= 1: + stepcode = 7 + + return {"control": control, "step_code": stepcode} + + @staticmethod + def _to_increment(value): + """Calculate the increment value from the stepcode and control bit as defined in the KNX standard section 3.3.1.""" + # calculated using floor(100/2^((value&0x07)-1)) + inc = [0, 100, 50, 25, 12, 6, 3, 1][value["step_code"] & 0x07] + return inc if value["control"] == 1 else -inc + + @classmethod + def to_knx(cls, value, invert=False): + """Serialize to KNX/IP raw data.""" + if not isinstance(value, int): + raise ConversionError("Cant serialize %s" % cls.__name__, value=value) + + return super().to_knx(cls._from_increment(value), invert) + + @classmethod + def from_knx(cls, raw, invert=False): + """Parse/deserialize from KNX/IP raw data.""" + return cls._to_increment(super().from_knx(raw, invert)) + + +class DPTControlStepwiseDimming(DPTControlStepwise): + """Abstraction for KNX DPT 3.007 / DPT_Control_Dimming in stepwise mode.""" + + +class DPTControlStepwiseBlinds(DPTControlStepwise): + """Abstraction for KNX DPT 3.008 / DPT_Control_Blinds in stepwise mode.""" + + +class TitleEnum(Enum): + """Enum with a descriptive string representation. + + Ensures values are rendered nicely, e.g. in home assistant. + """ + + def __str__(self): + """Return string representation.""" + # pylint: disable=no-member + return self.name.title() + + +class DPTControlStartStop(DPTControl): + """Abstraction for KNX DPT 3.xxx in start/stop mode.""" + + unit = "" + + class Direction(TitleEnum): + """Enum for indicating the direction.""" + + DECREASE = 0 + INCREASE = 1 + STOP = 2 + + @classmethod + def to_knx(cls, value, invert=False): + """Convert value to payload.""" + control = 0 + step_code = 0 + if value == cls.Direction(1): # INCREASE/DOWN + control = 1 + step_code = 1 + elif value == cls.Direction(0): # DECREASE/UP + control = 0 + step_code = 1 + elif value == cls.Direction(2): # STOP + control = 0 + step_code = 0 + else: + raise ConversionError("Cant serialize %s" % cls.__name__, value=value) + + values = {"control": control, "step_code": step_code} + return super().to_knx(values, invert) + + @classmethod + def from_knx(cls, raw, invert=False): + """Convert current payload to value.""" + values = super().from_knx(raw, invert) + if values["step_code"] == 0: + return cls.Direction(2) # STOP + if values["control"] == 0: + return cls.Direction(0) # DECREASE/UP + return cls.Direction(1) # INCREASE/DOWN + + +class DPTControlStartStopDimming(DPTControlStartStop): + """Abstraction for KNX DPT 3.007 / DPT_Control_Dimming in start/stop mode.""" + + # redefining Direction enum ensures proper typing, e.g. + # DPTControlStartStop.Direction.INCREASE != DPTControlStartStopDimming.Direction.INCREASE + class Direction(TitleEnum): + """Enum for indicating the direction.""" + + DECREASE = 0 + INCREASE = 1 + STOP = 2 + + +class DPTControlStartStopBlinds(DPTControlStartStop): + """Abstraction for KNX DPT 3.008 / DPT_Control_Blinds in start/stop mode.""" + + class Direction(TitleEnum): + """Enum for indicating the direction.""" + + UP = 0 + DOWN = 1 + STOP = 2 diff --git a/xknx/remote_value/__init__.py b/xknx/remote_value/__init__.py index 9ea8a0d690..a7d72b8fea 100644 --- a/xknx/remote_value/__init__.py +++ b/xknx/remote_value/__init__.py @@ -9,6 +9,7 @@ ) from .remote_value_color_rgb import RemoteValueColorRGB from .remote_value_color_rgbw import RemoteValueColorRGBW +from .remote_value_control import RemoteValueControl from .remote_value_datetime import RemoteValueDateTime from .remote_value_dpt_2_byte_unsigned import RemoteValueDpt2ByteUnsigned from .remote_value_dpt_value_1_ucount import RemoteValueDptValue1Ucount diff --git a/xknx/remote_value/remote_value_control.py b/xknx/remote_value/remote_value_control.py new file mode 100644 index 0000000000..e490410db0 --- /dev/null +++ b/xknx/remote_value/remote_value_control.py @@ -0,0 +1,71 @@ +""" +Module for managing a control remote value. + +Examples are switching commands with priority control, relative dimming or blinds control commands. +DPT 2.yyy and DPT 3.yyy +""" +from xknx.dpt import ( + DPTBinary, + DPTControl, + DPTControlStartStop, + DPTControlStartStopBlinds, + DPTControlStartStopDimming, + DPTControlStepwise, + DPTControlStepwiseBlinds, + DPTControlStepwiseDimming, +) +from xknx.exceptions import ConversionError + +from .remote_value import RemoteValue + + +class RemoteValueControl(RemoteValue): + """Abstraction for remote value used for controling.""" + + DPTMAP = { + "control": DPTControl, + "startstop": DPTControlStartStop, + "startstop_dimming": DPTControlStartStopDimming, + "startstop_blinds": DPTControlStartStopBlinds, + "stepwise": DPTControlStepwise, + "stepwise_dimming": DPTControlStepwiseDimming, + "stepwise_blinds": DPTControlStepwiseBlinds, + } + + def __init__( + self, + xknx, + group_address=None, + group_address_state=None, + value_type=None, + device_name=None, + after_update_cb=None, + invert=False, + ): + """Initialize control remote value.""" + # pylint: disable=too-many-arguments + super().__init__( + xknx, + group_address, + group_address_state, + device_name=device_name, + after_update_cb=after_update_cb, + ) + self.invert = invert + if value_type not in self.DPTMAP: + raise ConversionError( + "invalid value type", value_type=value_type, device_name=device_name + ) + self.value_type = value_type + + def payload_valid(self, payload): + """Test if telegram payload may be parsed.""" + return isinstance(payload, DPTBinary) + + def to_knx(self, value): + """Convert value to payload.""" + return DPTBinary(self.DPTMAP[self.value_type].to_knx(value, invert=self.invert)) + + def from_knx(self, payload): + """Convert current payload to value.""" + return self.DPTMAP[self.value_type].from_knx(payload.value, invert=self.invert) From cda5540bc3f87d51033d38f4415b8c13ebd15c7f Mon Sep 17 00:00:00 2001 From: buergi Date: Sat, 21 Nov 2020 21:40:07 +0100 Subject: [PATCH 4/7] Update DPT mapping in RemoteValueControl --- xknx/dpt/dpt_1byte_control.py | 19 +++++++++++++- xknx/remote_value/remote_value_control.py | 31 +++++------------------ 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/xknx/dpt/dpt_1byte_control.py b/xknx/dpt/dpt_1byte_control.py index 3605d5c6f9..6ef475060d 100644 --- a/xknx/dpt/dpt_1byte_control.py +++ b/xknx/dpt/dpt_1byte_control.py @@ -29,8 +29,9 @@ class DPTControl(DPTBase): APCI_STEPCODEMASK = 0x07 APCI_MAX_VALUE = APCI_CONTROLMASK | APCI_STEPCODEMASK - payload_length = 1 + value_type = "control" unit = "" + payload_length = 1 @classmethod def _encode(cls, control, step_code): @@ -104,6 +105,9 @@ def from_knx(cls, raw, invert=False): class DPTControlStepwise(DPTControl): """Abstraction for KNX DPT 3.xxx in stepwise mode with conversion to an incement value.""" + dpt_main_number = 3 + dpt_sub_number = None + value_type = "stepwise" unit = "%" @staticmethod @@ -158,10 +162,18 @@ def from_knx(cls, raw, invert=False): class DPTControlStepwiseDimming(DPTControlStepwise): """Abstraction for KNX DPT 3.007 / DPT_Control_Dimming in stepwise mode.""" + dpt_main_number = 3 + dpt_sub_number = 7 + value_type = "stepwise_dimming" + class DPTControlStepwiseBlinds(DPTControlStepwise): """Abstraction for KNX DPT 3.008 / DPT_Control_Blinds in stepwise mode.""" + dpt_main_number = 3 + dpt_sub_number = 8 + value_type = "stepwise_blinds" + class TitleEnum(Enum): """Enum with a descriptive string representation. @@ -178,6 +190,7 @@ def __str__(self): class DPTControlStartStop(DPTControl): """Abstraction for KNX DPT 3.xxx in start/stop mode.""" + value_type = "startstop" unit = "" class Direction(TitleEnum): @@ -221,6 +234,8 @@ def from_knx(cls, raw, invert=False): class DPTControlStartStopDimming(DPTControlStartStop): """Abstraction for KNX DPT 3.007 / DPT_Control_Dimming in start/stop mode.""" + value_type = "startstop_dimming" + # redefining Direction enum ensures proper typing, e.g. # DPTControlStartStop.Direction.INCREASE != DPTControlStartStopDimming.Direction.INCREASE class Direction(TitleEnum): @@ -234,6 +249,8 @@ class Direction(TitleEnum): class DPTControlStartStopBlinds(DPTControlStartStop): """Abstraction for KNX DPT 3.008 / DPT_Control_Blinds in start/stop mode.""" + value_type = "startstop_blinds" + class Direction(TitleEnum): """Enum for indicating the direction.""" diff --git a/xknx/remote_value/remote_value_control.py b/xknx/remote_value/remote_value_control.py index e490410db0..30765b71d4 100644 --- a/xknx/remote_value/remote_value_control.py +++ b/xknx/remote_value/remote_value_control.py @@ -4,16 +4,7 @@ Examples are switching commands with priority control, relative dimming or blinds control commands. DPT 2.yyy and DPT 3.yyy """ -from xknx.dpt import ( - DPTBinary, - DPTControl, - DPTControlStartStop, - DPTControlStartStopBlinds, - DPTControlStartStopDimming, - DPTControlStepwise, - DPTControlStepwiseBlinds, - DPTControlStepwiseDimming, -) +from xknx.dpt import DPTBase, DPTBinary from xknx.exceptions import ConversionError from .remote_value import RemoteValue @@ -22,16 +13,6 @@ class RemoteValueControl(RemoteValue): """Abstraction for remote value used for controling.""" - DPTMAP = { - "control": DPTControl, - "startstop": DPTControlStartStop, - "startstop_dimming": DPTControlStartStopDimming, - "startstop_blinds": DPTControlStartStopBlinds, - "stepwise": DPTControlStepwise, - "stepwise_dimming": DPTControlStepwiseDimming, - "stepwise_blinds": DPTControlStepwiseBlinds, - } - def __init__( self, xknx, @@ -52,11 +33,13 @@ def __init__( after_update_cb=after_update_cb, ) self.invert = invert - if value_type not in self.DPTMAP: + # pylint: disable=too-many-arguments + _dpt_class = DPTBase.parse_transcoder(value_type) + if _dpt_class is None: raise ConversionError( "invalid value type", value_type=value_type, device_name=device_name ) - self.value_type = value_type + self.dpt_class = _dpt_class def payload_valid(self, payload): """Test if telegram payload may be parsed.""" @@ -64,8 +47,8 @@ def payload_valid(self, payload): def to_knx(self, value): """Convert value to payload.""" - return DPTBinary(self.DPTMAP[self.value_type].to_knx(value, invert=self.invert)) + return DPTBinary(self.dpt_class.to_knx(value, invert=self.invert)) def from_knx(self, payload): """Convert current payload to value.""" - return self.DPTMAP[self.value_type].from_knx(payload.value, invert=self.invert) + return self.dpt_class.from_knx(payload.value, invert=self.invert) From 52398edc143eecb433640a06b82297ccbbfeb017 Mon Sep 17 00:00:00 2001 From: buergi Date: Wed, 9 Dec 2020 00:20:29 +0100 Subject: [PATCH 5/7] Accept some suggested changes --- ...ntrol_test.py => dpt_4bit_control_test.py} | 60 ++++++++++--------- xknx/dpt/__init__.py | 18 +++--- ...t_1byte_control.py => dpt_4bit_control.py} | 27 ++++----- 3 files changed, 52 insertions(+), 53 deletions(-) rename test/dpt_tests/{dpt_1byte_control_test.py => dpt_4bit_control_test.py} (84%) rename xknx/dpt/{dpt_1byte_control.py => dpt_4bit_control.py} (91%) diff --git a/test/dpt_tests/dpt_1byte_control_test.py b/test/dpt_tests/dpt_4bit_control_test.py similarity index 84% rename from test/dpt_tests/dpt_1byte_control_test.py rename to test/dpt_tests/dpt_4bit_control_test.py index 3ce68640b3..06cd309605 100644 --- a/test/dpt_tests/dpt_1byte_control_test.py +++ b/test/dpt_tests/dpt_4bit_control_test.py @@ -1,91 +1,93 @@ -"""Unit test for DPTControl objects.""" +"""Unit test for DPTControlStepCode objects.""" import unittest from xknx.dpt import ( - DPTControl, DPTControlStartStop, DPTControlStartStopBlinds, DPTControlStartStopDimming, + DPTControlStepCode, DPTControlStepwise, ) from xknx.exceptions import ConversionError -class TestDPTControl(unittest.TestCase): - """Test class for DPTControl objects.""" +class TestDPTControlStepCode(unittest.TestCase): + """Test class for DPTControlStepCode objects.""" def test_to_knx(self): - """Test serializing values to DPTControl.""" + """Test serializing values to DPTControlStepCode.""" for rawref in range(16): control = 1 if rawref >> 3 else 0 - raw = DPTControl.to_knx({"control": control, "step_code": rawref & 0x07}) + raw = DPTControlStepCode.to_knx( + {"control": control, "step_code": rawref & 0x07} + ) self.assertEqual(raw, rawref) def test_to_knx_inverted(self): - """Test serializing values to DPTControl in inverted mode.""" + """Test serializing values to DPTControlStepCode in inverted mode.""" for rawref in range(16): control = 0 if rawref >> 3 else 1 - raw = DPTControl.to_knx( + raw = DPTControlStepCode.to_knx( {"control": control, "step_code": rawref & 0x07}, invert=True ) self.assertEqual(raw, rawref) def test_to_knx_wrong_type(self): - """Test serializing wrong type to DPTControl.""" + """Test serializing wrong type to DPTControlStepCode.""" with self.assertRaises(ConversionError): - DPTControl.to_knx("") + DPTControlStepCode.to_knx("") with self.assertRaises(ConversionError): - DPTControl.to_knx(0) + DPTControlStepCode.to_knx(0) def test_to_knx_wrong_keys(self): - """Test serializing map with missing keys to DPTControl.""" + """Test serializing map with missing keys to DPTControlStepCode.""" with self.assertRaises(ConversionError): - DPTControl.to_knx({"control": 0}) + DPTControlStepCode.to_knx({"control": 0}) with self.assertRaises(ConversionError): - DPTControl.to_knx({"step_code": 0}) + DPTControlStepCode.to_knx({"step_code": 0}) def test_to_knx_wrong_value_types(self): - """Test serializing map with keys of invalid type to DPTControl.""" + """Test serializing map with keys of invalid type to DPTControlStepCode.""" with self.assertRaises(ConversionError): - DPTControl.to_knx({"control": ""}) + DPTControlStepCode.to_knx({"control": ""}) with self.assertRaises(ConversionError): - DPTControl.to_knx({"step_code": ""}) + DPTControlStepCode.to_knx({"step_code": ""}) def test_to_knx_wrong_values(self): - """Test serializing map with keys of invalid values to DPTControl.""" + """Test serializing map with keys of invalid values to DPTControlStepCode.""" with self.assertRaises(ConversionError): - DPTControl.to_knx({"control": -1, "step_code": 0}) + DPTControlStepCode.to_knx({"control": -1, "step_code": 0}) with self.assertRaises(ConversionError): - DPTControl.to_knx({"control": 2, "step_code": 0}) + DPTControlStepCode.to_knx({"control": 2, "step_code": 0}) with self.assertRaises(ConversionError): - DPTControl.to_knx({"control": 0, "step_code": -1}) + DPTControlStepCode.to_knx({"control": 0, "step_code": -1}) with self.assertRaises(ConversionError): - DPTControl.to_knx({"control": 0, "step_code": 0x8}) + DPTControlStepCode.to_knx({"control": 0, "step_code": 0x8}) def test_from_knx(self): - """Test parsing DPTControl types from KNX.""" + """Test parsing DPTControlStepCode types from KNX.""" for raw in range(16): control = 1 if raw >> 3 else 0 valueref = {"control": control, "step_code": raw & 0x07} - value = DPTControl.from_knx(raw) + value = DPTControlStepCode.from_knx(raw) self.assertEqual(value, valueref) def test_from_knx_inverted(self): - """Test parsing DPTControl types from KNX.""" + """Test parsing DPTControlStepCode types from KNX.""" for raw in range(16): control = 0 if raw >> 3 else 1 valueref = {"control": control, "step_code": raw & 0x07} - value = DPTControl.from_knx(raw, invert=True) + value = DPTControlStepCode.from_knx(raw, invert=True) self.assertEqual(value, valueref) def test_from_knx_wrong_value(self): - """Test parsing invalid DPTControl type from KNX.""" + """Test parsing invalid DPTControlStepCode type from KNX.""" with self.assertRaises(ConversionError): - DPTControl.from_knx(0x1F) + DPTControlStepCode.from_knx(0x1F) def test_unit(self): """Test unit_of_measurement function.""" - self.assertEqual(DPTControl.unit, "") + self.assertEqual(DPTControlStepCode.unit, "") class TestDPTControlStepwise(unittest.TestCase): diff --git a/xknx/dpt/__init__.py b/xknx/dpt/__init__.py index bb886b69b9..9eaca4c1af 100644 --- a/xknx/dpt/__init__.py +++ b/xknx/dpt/__init__.py @@ -6,15 +6,6 @@ """ # flake8: noqa from .dpt import DPTArray, DPTBase, DPTBinary, DPTComparator -from .dpt_1byte_control import ( - DPTControl, - DPTControlStartStop, - DPTControlStartStopBlinds, - DPTControlStartStopDimming, - DPTControlStepwise, - DPTControlStepwiseBlinds, - DPTControlStepwiseDimming, -) from .dpt_1byte_signed import DPTPercentV8, DPTSignedRelativeValue, DPTValue1Count from .dpt_1byte_uint import ( DPTDecimalFactor, @@ -71,6 +62,15 @@ DPTTimePeriodSec, DPTUElCurrentmA, ) +from .dpt_4bit_control import ( + DPTControlStartStop, + DPTControlStartStopBlinds, + DPTControlStartStopDimming, + DPTControlStepCode, + DPTControlStepwise, + DPTControlStepwiseBlinds, + DPTControlStepwiseDimming, +) from .dpt_4byte_float import ( DPT4ByteFloat, DPTAbsoluteTemperature, diff --git a/xknx/dpt/dpt_1byte_control.py b/xknx/dpt/dpt_4bit_control.py similarity index 91% rename from xknx/dpt/dpt_1byte_control.py rename to xknx/dpt/dpt_4bit_control.py index 6ef475060d..e18627ed73 100644 --- a/xknx/dpt/dpt_1byte_control.py +++ b/xknx/dpt/dpt_4bit_control.py @@ -1,8 +1,5 @@ """Implementation of Basic KNX DPT B1U3 Values (DPT 3.007/3.008). -Very good source of information for interpretation of the standard is -https://library.e.abb.com/public/78c74aa86d4648b7b9d918485cd4621a/2CDC500051M0203_ApplicationHB_Lighting_EN.pdf#page=34 - There are two separate dimming modes sharing the same DPT class: * Stepwise dimming @@ -21,7 +18,7 @@ from .dpt import DPTBase -class DPTControl(DPTBase): +class DPTControlStepCode(DPTBase): """Abstraction for KNX B1U3 values (DPT 3.007/3.008).""" # APCI (application layer control information) @@ -34,14 +31,14 @@ class DPTControl(DPTBase): payload_length = 1 @classmethod - def _encode(cls, control, step_code): + def _encode(cls, control: bool, step_code: int): """Encode control-bit with step-code.""" value = 1 if control > 0 else 0 value = (value << 3) | (step_code & cls.APCI_STEPCODEMASK) return value @classmethod - def _decode(cls, value): + def _decode(cls, value) -> tuple[bool, int]: """Decode value into control-bit and step-code.""" control = 1 if (value & cls.APCI_CONTROLMASK) != 0 else 0 step_code = value & cls.APCI_STEPCODEMASK @@ -55,7 +52,7 @@ def _test_boundaries(cls, raw): return False @classmethod - def _test_values(cls, control, step_code): + def _test_values(cls, control: bool, step_code: int): """Test if input values are valid.""" if isinstance(control, int) and isinstance(step_code, int): if control in (0, 1) and 0 <= step_code <= cls.APCI_STEPCODEMASK: @@ -63,7 +60,7 @@ def _test_values(cls, control, step_code): return False @classmethod - def to_knx(cls, value, invert=False): + def to_knx(cls, value, invert: bool = False): """Serialize to KNX/IP raw data.""" if not isinstance(value, dict): raise ConversionError( @@ -89,7 +86,7 @@ def to_knx(cls, value, invert=False): return cls._encode(control, step_code) @classmethod - def from_knx(cls, raw, invert=False): + def from_knx(cls, raw, invert: bool = False): """Parse/deserialize from KNX/IP raw data.""" if not cls._test_boundaries(raw): raise ConversionError("Cant parse %s" % cls.__name__, raw=raw) @@ -102,7 +99,7 @@ def from_knx(cls, raw, invert=False): return {"control": control, "step_code": step_code} -class DPTControlStepwise(DPTControl): +class DPTControlStepwise(DPTControlStepCode): """Abstraction for KNX DPT 3.xxx in stepwise mode with conversion to an incement value.""" dpt_main_number = 3 @@ -146,7 +143,7 @@ def _to_increment(value): return inc if value["control"] == 1 else -inc @classmethod - def to_knx(cls, value, invert=False): + def to_knx(cls, value, invert: bool = False): """Serialize to KNX/IP raw data.""" if not isinstance(value, int): raise ConversionError("Cant serialize %s" % cls.__name__, value=value) @@ -154,7 +151,7 @@ def to_knx(cls, value, invert=False): return super().to_knx(cls._from_increment(value), invert) @classmethod - def from_knx(cls, raw, invert=False): + def from_knx(cls, raw, invert: bool = False): """Parse/deserialize from KNX/IP raw data.""" return cls._to_increment(super().from_knx(raw, invert)) @@ -187,7 +184,7 @@ def __str__(self): return self.name.title() -class DPTControlStartStop(DPTControl): +class DPTControlStartStop(DPTControlStepCode): """Abstraction for KNX DPT 3.xxx in start/stop mode.""" value_type = "startstop" @@ -201,7 +198,7 @@ class Direction(TitleEnum): STOP = 2 @classmethod - def to_knx(cls, value, invert=False): + def to_knx(cls, value, invert: bool = False): """Convert value to payload.""" control = 0 step_code = 0 @@ -221,7 +218,7 @@ def to_knx(cls, value, invert=False): return super().to_knx(values, invert) @classmethod - def from_knx(cls, raw, invert=False): + def from_knx(cls, raw, invert: bool = False): """Convert current payload to value.""" values = super().from_knx(raw, invert) if values["step_code"] == 0: From 069ca50e89d8fb52ab5c0afd9726d67681ef66b8 Mon Sep 17 00:00:00 2001 From: buergi Date: Wed, 9 Dec 2020 01:18:46 +0100 Subject: [PATCH 6/7] Fix tuple type hint for python < 3.9 --- xknx/dpt/dpt_4bit_control.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/xknx/dpt/dpt_4bit_control.py b/xknx/dpt/dpt_4bit_control.py index e18627ed73..a0e32a2715 100644 --- a/xknx/dpt/dpt_4bit_control.py +++ b/xknx/dpt/dpt_4bit_control.py @@ -12,6 +12,7 @@ As the same payload in these cases in interpreted completely different it is reasonable to make separate DPT classes. """ from enum import Enum +from typing import Tuple from xknx.exceptions import ConversionError @@ -38,7 +39,7 @@ def _encode(cls, control: bool, step_code: int): return value @classmethod - def _decode(cls, value) -> tuple[bool, int]: + def _decode(cls, value) -> Tuple[bool, int]: """Decode value into control-bit and step-code.""" control = 1 if (value & cls.APCI_CONTROLMASK) != 0 else 0 step_code = value & cls.APCI_STEPCODEMASK From 5d1e3f8ac9d665f229fef8711b5d402ea87e9489 Mon Sep 17 00:00:00 2001 From: buergi Date: Wed, 9 Dec 2020 01:31:37 +0100 Subject: [PATCH 7/7] Fix keyword argument --- test/remote_value_tests/remote_value_control_test.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/test/remote_value_tests/remote_value_control_test.py b/test/remote_value_tests/remote_value_control_test.py index 159e3931da..16bd09662a 100644 --- a/test/remote_value_tests/remote_value_control_test.py +++ b/test/remote_value_tests/remote_value_control_test.py @@ -42,7 +42,8 @@ def test_set(self): self.assertEqual(xknx.telegrams.qsize(), 1) telegram = xknx.telegrams.get_nowait() self.assertEqual( - telegram, Telegram(GroupAddress("1/2/3"), payload=DPTBinary(0xB)) + telegram, + Telegram(destination_address=GroupAddress("1/2/3"), payload=DPTBinary(0xB)), ) def test_process(self): @@ -51,7 +52,9 @@ def test_process(self): remote_value = RemoteValueControl( xknx, group_address=GroupAddress("1/2/3"), value_type="stepwise" ) - telegram = Telegram(group_address=GroupAddress("1/2/3"), payload=DPTBinary(0xB)) + telegram = Telegram( + destination_address=GroupAddress("1/2/3"), payload=DPTBinary(0xB) + ) self.assertEqual(remote_value.value, None) self.loop.run_until_complete(asyncio.Task(remote_value.process(telegram))) self.assertEqual(remote_value.value, 25) @@ -64,12 +67,12 @@ def test_to_process_error(self): ) with self.assertRaises(CouldNotParseTelegram): telegram = Telegram( - group_address=GroupAddress("1/2/3"), payload=DPTArray(0x01) + destination_address=GroupAddress("1/2/3"), payload=DPTArray(0x01) ) self.loop.run_until_complete(asyncio.Task(remote_value.process(telegram))) with self.assertRaises(ConversionError): telegram = Telegram( - group_address=GroupAddress("1/2/3"), payload=DPTBinary(0x10) + destination_address=GroupAddress("1/2/3"), payload=DPTBinary(0x10) ) self.loop.run_until_complete(asyncio.Task(remote_value.process(telegram))) # pylint: disable=pointless-statement