Skip to content

Commit

Permalink
Improve code quality (mjg59#585)
Browse files Browse the repository at this point in the history
* Improve docstrings

* Fix line-too-long

* Disable unidiomatic-typecheck

* Move smart plugs to the top

* Use constants from const.py

* Run black
  • Loading branch information
felipediel committed Sep 12, 2021
1 parent 239dfc5 commit 63411ba
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 130 deletions.
8 changes: 4 additions & 4 deletions broadlink/climate.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Support for climate control."""
"""Support for HVAC units."""
import typing as t

from . import exceptions as e
Expand Down Expand Up @@ -106,8 +106,8 @@ def get_full_status(self) -> dict:
# Manual mode will activate last used temperature.
# In typical usage call set_temp to activate manual control and set temp.
# loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ]
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule
# loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule)
# loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule)
# The sensor command is currently experimental
def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None:
"""Set the mode of the device."""
Expand All @@ -124,7 +124,7 @@ def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None:
# Actual temperature calibration (AdJ) adj = -0.5. Precision 0.1C
# Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down,
# 1 for anti-freezing function open. Factory default: 0
# Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0
# Power on memory (POn) poweron = 0 for off, 1 for on. Default: 0
def set_advanced(
self,
loop_mode: int,
Expand Down
9 changes: 7 additions & 2 deletions broadlink/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

from . import exceptions as e
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_RETRY_INTVL, DEFAULT_TIMEOUT
from .const import (
DEFAULT_BCAST_ADDR,
DEFAULT_PORT,
DEFAULT_RETRY_INTVL,
DEFAULT_TIMEOUT,
)
from .protocol import Datetime

HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool]
Expand Down Expand Up @@ -48,7 +53,7 @@ def scan(
try:
while (time.time() - start_time) < timeout:
time_left = timeout - (time.time() - start_time)
conn.settimeout(min(1, time_left))
conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left))
conn.sendto(packet, (discover_ip_address, discover_ip_port))

while True:
Expand Down
1 change: 1 addition & 0 deletions broadlink/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __str__(self):

def __eq__(self, other):
"""Return self==value."""
# pylint: disable=unidiomatic-typecheck
return type(self) == type(other) and self.args == other.args

def __hash__(self):
Expand Down
1 change: 1 addition & 0 deletions broadlink/protocol.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""The networking part of the python-broadlink library."""
import datetime as dt
import time

Expand Down
248 changes: 124 additions & 124 deletions broadlink/switch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,130 +6,6 @@
from .device import Device


class mp1(Device):
"""Controls a Broadlink MP1."""

TYPE = "MP1"

def set_power_mask(self, sid_mask: int, pwr: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(16)
packet[0x00] = 0x0D
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask)
packet[0x07] = 0xC0
packet[0x08] = 0x02
packet[0x0A] = 0x03
packet[0x0D] = sid_mask
packet[0x0E] = sid_mask if pwr else 0

response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])

def set_power(self, sid: int, pwr: bool) -> None:
"""Set the power state of the device."""
sid_mask = 0x01 << (sid - 1)
self.set_power_mask(sid_mask, pwr)

def check_power_raw(self) -> int:
"""Return the power state of the device in raw format."""
packet = bytearray(16)
packet[0x00] = 0x0A
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x06] = 0xAE
packet[0x07] = 0xC0
packet[0x08] = 0x01

response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[0x0E]

def check_power(self) -> dict:
"""Return the power state of the device."""
data = self.check_power_raw()
return {
"s1": bool(data & 1),
"s2": bool(data & 2),
"s3": bool(data & 4),
"s4": bool(data & 8),
}


class bg1(Device):
"""Controls a BG Electrical smart outlet."""

TYPE = "BG1"

def get_state(self) -> dict:
"""Return the power state of the device.
Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`
"""
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)

def set_state(
self,
pwr: bool = None,
pwr1: bool = None,
pwr2: bool = None,
maxworktime: int = None,
maxworktime1: int = None,
maxworktime2: int = None,
idcbrightness: int = None,
) -> dict:
"""Set the power state of the device."""
state = {}
if pwr is not None:
state["pwr"] = int(bool(pwr))
if pwr1 is not None:
state["pwr1"] = int(bool(pwr1))
if pwr2 is not None:
state["pwr2"] = int(bool(pwr2))
if maxworktime is not None:
state["maxworktime"] = maxworktime
if maxworktime1 is not None:
state["maxworktime1"] = maxworktime1
if maxworktime2 is not None:
state["maxworktime2"] = maxworktime2
if idcbrightness is not None:
state["idcbrightness"] = idcbrightness

packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)

def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a message."""
packet = bytearray(14)
data = json.dumps(state).encode()
length = 12 + len(data)
struct.pack_into(
"<HHHHBBI", packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(data)
)
packet.extend(data)
checksum = sum(packet[0x2:], 0xBEAF) & 0xFFFF
packet[0x06:0x08] = checksum.to_bytes(2, "little")
return packet

def _decode(self, response: bytes) -> dict:
"""Decode a message."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x0A)[0]
state = json.loads(payload[0x0E : 0x0E + js_len])
return state


class sp1(Device):
"""Controls a Broadlink SP1."""

Expand Down Expand Up @@ -360,3 +236,127 @@ def _decode(self, response: bytes) -> dict:
js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0x0E : 0x0E + js_len])
return state


class bg1(Device):
"""Controls a BG Electrical smart outlet."""

TYPE = "BG1"

def get_state(self) -> dict:
"""Return the power state of the device.
Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`
"""
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)

def set_state(
self,
pwr: bool = None,
pwr1: bool = None,
pwr2: bool = None,
maxworktime: int = None,
maxworktime1: int = None,
maxworktime2: int = None,
idcbrightness: int = None,
) -> dict:
"""Set the power state of the device."""
state = {}
if pwr is not None:
state["pwr"] = int(bool(pwr))
if pwr1 is not None:
state["pwr1"] = int(bool(pwr1))
if pwr2 is not None:
state["pwr2"] = int(bool(pwr2))
if maxworktime is not None:
state["maxworktime"] = maxworktime
if maxworktime1 is not None:
state["maxworktime1"] = maxworktime1
if maxworktime2 is not None:
state["maxworktime2"] = maxworktime2
if idcbrightness is not None:
state["idcbrightness"] = idcbrightness

packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)

def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a message."""
packet = bytearray(14)
data = json.dumps(state).encode()
length = 12 + len(data)
struct.pack_into(
"<HHHHBBI", packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(data)
)
packet.extend(data)
checksum = sum(packet[0x2:], 0xBEAF) & 0xFFFF
packet[0x06:0x08] = checksum.to_bytes(2, "little")
return packet

def _decode(self, response: bytes) -> dict:
"""Decode a message."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x0A)[0]
state = json.loads(payload[0x0E : 0x0E + js_len])
return state


class mp1(Device):
"""Controls a Broadlink MP1."""

TYPE = "MP1"

def set_power_mask(self, sid_mask: int, pwr: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(16)
packet[0x00] = 0x0D
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask)
packet[0x07] = 0xC0
packet[0x08] = 0x02
packet[0x0A] = 0x03
packet[0x0D] = sid_mask
packet[0x0E] = sid_mask if pwr else 0

response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])

def set_power(self, sid: int, pwr: bool) -> None:
"""Set the power state of the device."""
sid_mask = 0x01 << (sid - 1)
self.set_power_mask(sid_mask, pwr)

def check_power_raw(self) -> int:
"""Return the power state of the device in raw format."""
packet = bytearray(16)
packet[0x00] = 0x0A
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x06] = 0xAE
packet[0x07] = 0xC0
packet[0x08] = 0x01

response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[0x0E]

def check_power(self) -> dict:
"""Return the power state of the device."""
data = self.check_power_raw()
return {
"s1": bool(data & 1),
"s2": bool(data & 2),
"s3": bool(data & 4),
"s4": bool(data & 8),
}

0 comments on commit 63411ba

Please sign in to comment.