Skip to content

Commit

Permalink
Release v0.3.0
Browse files Browse the repository at this point in the history
- dedicated submodule for exceptions (still exposed at module level through
  `aioairq.__all__`)

- `core.DeviceInfo`:
    - room_type -> suggested_area to further consistency with home assistant
    - all fields, except for `id` are optional (much like with
    `homeassistant.helpers.entity.DeviceInfo`)

- `core.AirQ.get`:
  - limited to an explicit set of queries / webserver routes
  (namely `AirQ._supported_routes = ["log", "config", "data", "average", "ping"]).
  Other routes return objects with different structure, which aren't consistent
  with the current decoding steps
  - Error handling for JSONDecodeError and KeyError, which ought not to happen
  with the aforementioned routes (added as a precaution against unexpected firmware
  behaviour)

- `encrypt.AESCipher`:
  - failed authentication is now inferred as close to the point of failure as possible.
    The success or failure of the authentication is based on the ability to decode
    the response from the device, thus the error `InvalidAuth` is raised
    in `AESCipher.decode`
  • Loading branch information
Sibgatulin committed Mar 7, 2023
1 parent 9f95608 commit c3ec577
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 35 deletions.
7 changes: 4 additions & 3 deletions aioairq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
__email__ = "daniel.lehmann@corant.de"
__url__ = "https://www.air-q.com"
__license__ = "Apache License 2.0"
__version__ = "0.2.4"
__all__ = ["AirQ", "DeviceInfo", "InvalidAuth", "InvalidInput"]
__version__ = "0.3.0"
__all__ = ["AirQ", "DeviceInfo", "InvalidAuth", "InvalidInput", "InvalidAirQResponse"]

from aioairq.core import AirQ, DeviceInfo, InvalidAuth, InvalidInput
from aioairq.core import AirQ, DeviceInfo
from aioairq.exceptions import InvalidAirQResponse, InvalidAuth, InvalidInput
77 changes: 48 additions & 29 deletions aioairq/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@
import aiohttp

from aioairq.encrypt import AESCipher
from aioairq.exceptions import InvalidAirQResponse, InvalidInput


class DeviceInfo(TypedDict):
"""Container for device information"""

id: str
name: str
model: str
sw_version: str
hw_version: str
name: str | None
model: str | None
suggested_area: str | None
sw_version: str | None
hw_version: str | None


class AirQ:
_supported_routes = ["config", "log", "data", "average", "ping"]

def __init__(
self,
address: str,
Expand Down Expand Up @@ -70,48 +74,71 @@ async def validate(self) -> None:
"""Test if the password provided to the constructor is valid.
Raises InvalidAuth if the password is not correct.
This method is a workaround, as currently the device does not support
authentication. This module infers the success of failure of the
authentication based on the ability to decode the response from the device.
This is merely a convenience function, relying on the exception being
raised down the stack (namely by AESCipher.decode from within self.get)
"""
try:
await self.get("ping")
except UnicodeDecodeError:
raise InvalidAuth
await self.get("ping")

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.address})"

async def fetch_device_info(self) -> DeviceInfo:
"""Fetch condensed device description"""
config = await self.get("config")
config: dict = await self.get("config")
room_type = config.get("RoomType")

try:
# The only required field. Should not really be missing, just a precaution
device_id = config["id"]
except KeyError:
raise InvalidAirQResponse

return DeviceInfo(
id=config["id"],
name=config["devicename"],
model=config["type"],
sw_version=config["air-Q-Software-Version"],
hw_version=config["air-Q-Hardware-Version"],
id=device_id,
name=config.get("devicename"),
model=config.get("type"),
suggested_area=room_type.replace("-", " ").title() if room_type else None,
sw_version=config.get("air-Q-Software-Version"),
hw_version=config.get("air-Q-Hardware-Version"),
)

@staticmethod
def drop_uncertainties_from_data(data: dict) -> dict:
"""Filter returned dict and substitute (value, uncertainty) with the value.
"""Filter returned dict and substitute [value, uncertainty] with the value.
The device attempts to estimate the uncertainty, or error, of certain readings.
These readings are returned as tuples of (value, uncertainty). Often, the latter
is not desired, and this is a convenience method to homogenise the dict a little
"""
return {k: v[0] if isinstance(v, list) else v for k, v in data.items()}
# `if v else None` is a precaution for the case of v being an empty list
# (which ought not to happen really...)
return {
k: (v[0] if v else None) if isinstance(v, list) else v
for k, v in data.items()
}

async def get(self, subject: str) -> dict:
"""Return the given subject from the air-Q device"""
if subject not in self._supported_routes:
raise NotImplementedError(
f"subject must be in {self._supported_routes}, got {subject}"
)

async with self._session.get(
f"{self.anchor}/{subject}", timeout=self._timeout
) as response:
html = await response.text()

try:
encoded_message = json.loads(html)["content"]
return json.loads(self.aes.decode(encoded_message))
except (json.JSONDecodeError, KeyError):
raise InvalidAirQResponse(
"AirQ.get() is currently limited to a set of requests, "
f"returning a dict with a key 'content' (namely {self._supported_routes}). "
f"AirQ.get({subject}) returned {html}"
)

return json.loads(self.aes.decode(encoded_message))

@property
async def data(self):
Expand All @@ -124,11 +151,3 @@ async def average(self):
@property
async def config(self):
return await self.get("config")


class InvalidAuth(Exception):
"""Error to indicate there is invalid auth."""


class InvalidInput(Exception):
"""Error to indicate there is invalid auth."""
13 changes: 10 additions & 3 deletions aioairq/encrypt.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""Module concerned with encryption of the data"""
import base64

from Crypto import Random
from Crypto.Cipher import AES

from aioairq.exceptions import InvalidAuth

class AESCipher:

class AESCipher:
_bs = AES.block_size # 16

def __init__(self, passw: str):
Expand All @@ -29,7 +29,14 @@ def decode(self, encrypted: bytes) -> str:
iv = decoded[: self._bs]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(decoded[self._bs :])
return self._unpad(decrypted.decode("utf-8"))
try:
# Currently the device does not support proper authentication.
# The success or failure of the authentication based on the ability
# to decode the response from the device.
decoded = decrypted.decode("utf-8")
except UnicodeDecodeError:
raise InvalidAuth("Failed to decode a message. Incorrect password")
return self._unpad(decoded)

@staticmethod
def _unpad(data: str) -> str:
Expand Down
10 changes: 10 additions & 0 deletions aioairq/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class InvalidAuth(Exception):
"""Error to indicate an authentication failure."""


class InvalidInput(Exception):
"""Error to indicate the device ID / IP is invalid."""


class InvalidAirQResponse(Exception):
"""Error to indicate incorrect / unexpected response from the device"""

0 comments on commit c3ec577

Please sign in to comment.