Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@

All notable changes to this project will be documented in this file.

## [0.2.6] - 2025-08-06

### Added

- Added redaction of data in exceptions when requesting `status()`
- Additional settings in dataclass (HA Core Issue 150118)
- Added 'likely' mocked fixture for above issue
- Added additional devices (see [Contributing](CONTRIBUTE.md) for more information)

### Changed

- Changed name and kwargs for discovery function

## [0.2.5] - 2025-08-05

### Added
Expand Down
2 changes: 2 additions & 0 deletions CONTRIBUTE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ It would be very helpful if you would share your configuration data to this proj
We currently have data on

- Nanostation 5AC (LOCO5AC) - PTP - both AP and Station output of `/status.cgi` present (by @CoMPaTech)
- Nanobeam 5AC - PTMP - Station (by @PlayFaster)
- LiteAP GPS - PTMP - AP (by @PlayFaster)

## Secure your data

Expand Down
42 changes: 29 additions & 13 deletions airos/airos8.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import aiohttp
from mashumaro.exceptions import InvalidFieldValue, MissingField

from .data import AirOS8Data as AirOSData
from .data import AirOS8Data as AirOSData, redact_data_smart
from .exceptions import (
AirOSConnectionAuthenticationError,
AirOSConnectionSetupError,
Expand Down Expand Up @@ -268,28 +268,44 @@ async def status(self) -> AirOSData:
if response.status == 200:
try:
response_json = json.loads(response_text)
try:
adjusted_json = self.derived_data(response_json)
airos_data = AirOSData.from_dict(adjusted_json)
except (MissingField, InvalidFieldValue) as err:
_LOGGER.exception("Failed to deserialize AirOS data")
raise AirOSKeyDataMissingError from err

return airos_data
adjusted_json = self.derived_data(response_json)
airos_data = AirOSData.from_dict(adjusted_json)
except InvalidFieldValue as err:
# Log with .error() as this is a specific, known type of issue
redacted_data = redact_data_smart(response_json)
_LOGGER.error(
"Failed to deserialize AirOS data due to an invalid field value: %s",
redacted_data,
)
raise AirOSKeyDataMissingError from err
except MissingField as err:
# Log with .exception() for a full stack trace
redacted_data = redact_data_smart(response_json)
_LOGGER.exception(
"Failed to deserialize AirOS data due to a missing field: %s",
redacted_data,
)
raise AirOSKeyDataMissingError from err

except json.JSONDecodeError:
_LOGGER.exception(
"JSON Decode Error in authenticated status response"
)
raise AirOSDataMissingError from None

return airos_data
else:
log = f"Authenticated status.cgi failed: {response.status}. Response: {response_text}"
_LOGGER.error(log)
raise AirOSDeviceConnectionError from None
_LOGGER.error(
"Status API call failed with status %d: %s",
response.status,
response_text,
)
raise AirOSDeviceConnectionError
except (
aiohttp.ClientError,
aiohttp.client_exceptions.ConnectionTimeoutError,
) as err:
_LOGGER.exception("Error during authenticated status.cgi call")
_LOGGER.error("Status API call failed: %s", err)
raise AirOSDeviceConnectionError from err

async def stakick(self, mac_address: str = None) -> bool:
Expand Down
104 changes: 96 additions & 8 deletions airos/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,95 @@

from dataclasses import dataclass
from enum import Enum
import ipaddress
import logging
import re
from typing import Any

from mashumaro import DataClassDictMixin

logger = logging.getLogger(__name__)

# Regex for a standard MAC address format (e.g., 01:23:45:67:89:AB)
# This handles both colon and hyphen separators.
MAC_ADDRESS_REGEX = re.compile(r"^([0-9a-fA-F]{2}[:-]){5}([0-9a-fA-F]{2})$")

# Regex for a MAC address mask (e.g., the redacted format 00:00:00:00:89:AB)
MAC_ADDRESS_MASK_REGEX = re.compile(r"^(00:){4}[0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}$")


# Helper functions
def is_mac_address(value: str) -> bool:
"""Check if a string is a valid MAC address."""
return bool(MAC_ADDRESS_REGEX.match(value))


def is_mac_address_mask(value: str) -> bool:
"""Check if a string is a valid MAC address mask (e.g., the redacted format)."""
return bool(MAC_ADDRESS_MASK_REGEX.match(value))


def is_ip_address(value: str) -> bool:
"""Check if a string is a valid IPv4 or IPv6 address."""
try:
ipaddress.ip_address(value)
return True
except ValueError:
return False


def redact_data_smart(data: dict) -> dict:
"""Recursively redacts sensitive keys in a dictionary."""
sensitive_keys = {
"hostname",
"essid",
"mac",
"apmac",
"hwaddr",
"lastip",
"ipaddr",
"ip6addr",
"device_id",
"sys_id",
"station_id",
"platform",
}

def _redact(d: dict):
if not isinstance(d, dict):
return d

redacted_d = {}
for k, v in d.items():
if k in sensitive_keys:
if isinstance(v, str) and (is_mac_address(v) or is_mac_address_mask(v)):
# Redact only the first 6 hex characters of a MAC address
redacted_d[k] = "00:11:22:33:" + v.replace("-", ":").upper()[-5:]
elif isinstance(v, str) and is_ip_address(v):
# Redact to a dummy local IP address
redacted_d[k] = "127.0.0.3"
elif isinstance(v, list) and all(
isinstance(i, str) and is_ip_address(i) for i in v
):
# Redact list of IPs to a dummy list
redacted_d[k] = ["127.0.0.3"]
else:
redacted_d[k] = "REDACTED"
elif isinstance(v, dict):
redacted_d[k] = _redact(v)
elif isinstance(v, list):
redacted_d[k] = [
_redact(item) if isinstance(item, dict) else item for item in v
]
else:
redacted_d[k] = v
return redacted_d

return _redact(data)


# Data class start


def _check_and_log_unknown_enum_value(
data_dict: dict[str, Any],
Expand Down Expand Up @@ -152,6 +234,7 @@ class Polling:
fixed_frame: bool
gps_sync: bool
ff_cap_rep: bool
flex_mode: int | None = None # Not present in all devices


@dataclass
Expand Down Expand Up @@ -207,9 +290,14 @@ class EthList:
class GPSData:
"""Leaf definition."""

lat: str
lon: str
fix: int
lat: float | None = None
lon: float | None = None
fix: int | None = None
sats: int | None = None # LiteAP GPS
dim: int | None = None # LiteAP GPS
dop: float | None = None # LiteAP GPS
alt: float | None = None # LiteAP GPS
time_synced: int | None = None # LiteAP GPS


@dataclass
Expand All @@ -235,7 +323,6 @@ class Remote:
totalram: int
freeram: int
netrole: str
mode: WirelessMode
sys_id: str
tx_throughput: int
rx_throughput: int
Expand All @@ -254,20 +341,21 @@ class Remote:
rx_bytes: int
antenna_gain: int
cable_loss: int
height: int
ethlist: list[EthList]
ipaddr: list[str]
ip6addr: list[str]
gps: GPSData
oob: bool
unms: UnmsStatus
airview: int
service: ServiceTime
mode: WirelessMode | None = None # Investigate why remotes can have no mode set
ip6addr: list[str] | None = None # For v4 only devices
height: int | None = None

@classmethod
def __pre_deserialize__(cls, d: dict[str, Any]) -> dict[str, Any]:
"""Pre-deserialize hook for Wireless."""
_check_and_log_unknown_enum_value(d, "mode", WirelessMode, "Wireless", "mode")
_check_and_log_unknown_enum_value(d, "mode", WirelessMode, "Remote", "mode")
return d


Expand Down Expand Up @@ -329,7 +417,6 @@ class Wireless:
"""Leaf definition."""

essid: str
mode: WirelessMode
ieeemode: IeeeMode
band: int
compat_11n: int
Expand Down Expand Up @@ -362,6 +449,7 @@ class Wireless:
count: int
sta: list[Station]
sta_disconnected: list[Disconnected]
mode: WirelessMode | None = None # Investigate further (see WirelessMode in Remote)

@classmethod
def __pre_deserialize__(cls, d: dict[str, Any]) -> dict[str, Any]:
Expand Down
6 changes: 4 additions & 2 deletions airos/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ def parse_airos_packet(self, data: bytes, host_ip: str) -> dict[str, Any] | None
return parsed_info


async def async_discover_devices(timeout: int) -> dict[str, dict[str, Any]]:
async def airos_discover_devices(
timeout: int = 30, listen_ip: str = "0.0.0.0", port: int = DISCOVERY_PORT
) -> dict[str, dict[str, Any]]:
"""Discover unconfigured airOS devices on the network for a given timeout.

This function sets up a listener, waits for a period, and returns
Expand All @@ -301,7 +303,7 @@ async def _async_airos_device_found(device_info: dict[str, Any]) -> None:
protocol,
) = await asyncio.get_running_loop().create_datagram_endpoint(
lambda: AirOSDiscoveryProtocol(_async_airos_device_found),
local_addr=("0.0.0.0", DISCOVERY_PORT),
local_addr=(listen_ip, port),
)
try:
await asyncio.sleep(timeout)
Expand Down
Loading