diff --git a/src/infuse_iot/commands.py b/src/infuse_iot/commands.py index 3b0254a..2cd878a 100644 --- a/src/infuse_iot/commands.py +++ b/src/infuse_iot/commands.py @@ -80,24 +80,3 @@ def data_progress_cb(self, offset: int) -> None: def handle_response(self, return_code, response): """Handle RPC_RSP""" raise NotImplementedError - - class VariableSizeResponse: - base_fields: list[tuple[str, type[ctypes._SimpleCData]]] = [] - var_name = "x" - var_type: type[ctypes._SimpleCData] = ctypes.c_ubyte - - @classmethod - def from_buffer_copy(cls, source, offset=0): - class response_base(ctypes.LittleEndianStructure): - _fields_ = cls.base_fields - _pack_ = 1 - - var_bytes = (len(source) - offset) - ctypes.sizeof(response_base) - assert var_bytes % ctypes.sizeof(cls.var_type) == 0 - var_num = var_bytes // ctypes.sizeof(cls.var_type) - - class response(ctypes.LittleEndianStructure): - _fields_ = [*cls.base_fields, (cls.var_name, cls.var_type * var_num)] - _pack_ = 1 - - return response.from_buffer_copy(source, offset) diff --git a/src/infuse_iot/generated/kv_definitions.py b/src/infuse_iot/generated/kv_definitions.py new file mode 100644 index 0000000..474a3e1 --- /dev/null +++ b/src/infuse_iot/generated/kv_definitions.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python3 +# mypy: ignore-errors + +"""Autogenerated KV store definitions""" + +import ctypes + +from infuse_iot.util.ctypes import VLACompatLittleEndianStruct + + +class structs: + class gcs_location(VLACompatLittleEndianStruct): + """Geographic Co-ordinate System location""" + + _fields_ = [ + ("latitude", ctypes.c_int32), + ("longitude", ctypes.c_int32), + ("height", ctypes.c_int32), + ] + _pack_ = 1 + + class bt_addr_le(VLACompatLittleEndianStruct): + """Bluetooth address type (bt_addr_le_t)""" + + _fields_ = [ + ("type", ctypes.c_uint8), + ("val", 6 * ctypes.c_uint8), + ] + _pack_ = 1 + + class kv_string(VLACompatLittleEndianStruct): + """String type""" + + _fields_ = [ + ("value_num", ctypes.c_uint8), + ] + vla_field = ("value", 0 * ctypes.c_char) + _pack_ = 1 + + class kv_mcuboot_img_sem_ver(VLACompatLittleEndianStruct): + """MCUboot semantic versioning struct""" + + _fields_ = [ + ("major", ctypes.c_uint8), + ("minor", ctypes.c_uint8), + ("revision", ctypes.c_uint16), + ("build_num", ctypes.c_uint32), + ] + _pack_ = 1 + + +class slots: + class reboots(VLACompatLittleEndianStruct): + """Number of times device has rebooted""" + + NAME = "REBOOTS" + BASE_ID = 0 + RANGE = 1 + _fields_ = [ + ("count", ctypes.c_uint32), + ] + _pack_ = 1 + + class bluetooth_addr(VLACompatLittleEndianStruct): + """Bluetooth advertising address""" + + NAME = "BLUETOOTH_ADDR" + BASE_ID = 1 + RANGE = 1 + _fields_ = [ + ("address", structs.bt_addr_le), + ] + _pack_ = 1 + + class exfat_disk_info(VLACompatLittleEndianStruct): + """exFAT disk information""" + + NAME = "EXFAT_DISK_INFO" + BASE_ID = 2 + RANGE = 1 + _fields_ = [ + ("block_count", ctypes.c_uint32), + ("block_size", ctypes.c_uint32), + ] + _pack_ = 1 + + class bluetooth_ctlr_version(VLACompatLittleEndianStruct): + """External Bluetooth controller version""" + + NAME = "BLUETOOTH_CTLR_VERSION" + BASE_ID = 3 + RANGE = 1 + _fields_ = [ + ("application", ctypes.c_uint32), + ("version", structs.kv_mcuboot_img_sem_ver), + ] + _pack_ = 1 + + class fixed_location(VLACompatLittleEndianStruct): + """Fixed global location of the device""" + + NAME = "FIXED_LOCATION" + BASE_ID = 10 + RANGE = 1 + _fields_ = [ + ("location", structs.gcs_location), + ] + _pack_ = 1 + + class wifi_ssid(VLACompatLittleEndianStruct): + """WiFi network name""" + + NAME = "WIFI_SSID" + BASE_ID = 20 + RANGE = 1 + _fields_ = [ + ] + vla_field = ("ssid", structs.kv_string) + _pack_ = 1 + + class wifi_psk(VLACompatLittleEndianStruct): + """WiFi network password""" + + NAME = "WIFI_PSK" + BASE_ID = 21 + RANGE = 1 + _fields_ = [ + ] + vla_field = ("psk", structs.kv_string) + _pack_ = 1 + + class ntp_server_url(VLACompatLittleEndianStruct): + """URL of the NTP server to use for time synchronisation""" + + NAME = "NTP_SERVER_URL" + BASE_ID = 30 + RANGE = 1 + _fields_ = [ + ] + vla_field = ("url", structs.kv_string) + _pack_ = 1 + + class epacket_udp_url(VLACompatLittleEndianStruct): + """ePacket UDP server hostname""" + + NAME = "EPACKET_UDP_URL" + BASE_ID = 31 + RANGE = 1 + _fields_ = [ + ] + vla_field = ("server", structs.kv_string) + _pack_ = 1 + + class epacket_udp_port(VLACompatLittleEndianStruct): + """ePacket UDP server port""" + + NAME = "EPACKET_UDP_PORT" + BASE_ID = 32 + RANGE = 1 + _fields_ = [ + ("port", ctypes.c_uint32), + ] + _pack_ = 1 + + class lte_modem_model(VLACompatLittleEndianStruct): + """Modem model as returned by AT+CGMM""" + + NAME = "LTE_MODEM_MODEL" + BASE_ID = 40 + RANGE = 1 + _fields_ = [ + ] + vla_field = ("model", structs.kv_string) + _pack_ = 1 + + class lte_modem_firmware_revision(VLACompatLittleEndianStruct): + """Modem firmware revision as returned by AT+CGMR""" + + NAME = "LTE_MODEM_FIRMWARE_REVISION" + BASE_ID = 41 + RANGE = 1 + _fields_ = [ + ] + vla_field = ("revision", structs.kv_string) + _pack_ = 1 + + class lte_modem_esn(VLACompatLittleEndianStruct): + """'Electronic Serial Number' as returned by AT+CGSN=0""" + + NAME = "LTE_MODEM_ESN" + BASE_ID = 42 + RANGE = 1 + _fields_ = [ + ] + vla_field = ("esn", structs.kv_string) + _pack_ = 1 + + class lte_modem_imei(VLACompatLittleEndianStruct): + """'International Modem Equiment Identifier' as returned by AT+CGSN=1""" + + NAME = "LTE_MODEM_IMEI" + BASE_ID = 43 + RANGE = 1 + _fields_ = [ + ("imei", ctypes.c_uint64), + ] + _pack_ = 1 + + class lte_sim_uicc(VLACompatLittleEndianStruct): + """SIM Universal Identifier (https://www.itu.int/en/ITU-T/inr/forms/Pages/iin.aspx)""" + + NAME = "LTE_SIM_UICC" + BASE_ID = 44 + RANGE = 1 + _fields_ = [ + ] + vla_field = ("uicc", structs.kv_string) + _pack_ = 1 + + class lte_pdp_config(VLACompatLittleEndianStruct): + """Packet Data Protocol (PDP) default context configuration""" + + NAME = "LTE_PDP_CONFIG" + BASE_ID = 45 + RANGE = 1 + _fields_ = [ + ] + vla_field = ("apn", structs.kv_string) + _pack_ = 1 + + class bluetooth_peer(VLACompatLittleEndianStruct): + """Bluetooth peer device""" + + NAME = "BLUETOOTH_PEER" + BASE_ID = 50 + RANGE = 1 + _fields_ = [ + ("address", structs.bt_addr_le), + ] + _pack_ = 1 + + class geofence(VLACompatLittleEndianStruct): + """Array of points defining a closed polygon""" + + NAME = "GEOFENCE" + BASE_ID = 100 + RANGE = 16 + _fields_ = [ + ] + vla_field = ("points", 0 * structs.gcs_location) + _pack_ = 1 + + class secure_storage_reserved(VLACompatLittleEndianStruct): + """Keys reserved for secure storage (do not enable)""" + + NAME = "SECURE_STORAGE_RESERVED" + BASE_ID = 30000 + RANGE = 10 + _fields_ = [ + ] + vla_field = ("data", 0 * ctypes.c_uint8) + _pack_ = 1 + + ID_MAPPING = { + 0: reboots, + 1: bluetooth_addr, + 2: exfat_disk_info, + 3: bluetooth_ctlr_version, + 10: fixed_location, + 20: wifi_ssid, + 21: wifi_psk, + 30: ntp_server_url, + 31: epacket_udp_url, + 32: epacket_udp_port, + 40: lte_modem_model, + 41: lte_modem_firmware_revision, + 42: lte_modem_esn, + 43: lte_modem_imei, + 44: lte_sim_uicc, + 45: lte_pdp_config, + 50: bluetooth_peer, + 100: geofence, + 101: geofence, + 102: geofence, + 103: geofence, + 104: geofence, + 105: geofence, + 106: geofence, + 107: geofence, + 108: geofence, + 109: geofence, + 110: geofence, + 111: geofence, + 112: geofence, + 113: geofence, + 114: geofence, + 115: geofence, + 30000: secure_storage_reserved, + 30001: secure_storage_reserved, + 30002: secure_storage_reserved, + 30003: secure_storage_reserved, + 30004: secure_storage_reserved, + 30005: secure_storage_reserved, + 30006: secure_storage_reserved, + 30007: secure_storage_reserved, + 30008: secure_storage_reserved, + 30009: secure_storage_reserved, + } diff --git a/src/infuse_iot/rpc_wrappers/kv_bt_peer.py b/src/infuse_iot/rpc_wrappers/kv_bt_peer.py index 9d14b2c..c2ac32c 100644 --- a/src/infuse_iot/rpc_wrappers/kv_bt_peer.py +++ b/src/infuse_iot/rpc_wrappers/kv_bt_peer.py @@ -6,7 +6,7 @@ import infuse_iot.generated.rpc_definitions as defs from infuse_iot.commands import InfuseRpcCommand from infuse_iot.util.argparse import BtLeAddress -from infuse_iot.util.ctypes import bytes_to_uint8 +from infuse_iot.util.ctypes import VLACompatLittleEndianStruct, bytes_to_uint8 class kv_bt_peer(InfuseRpcCommand, defs.kv_write): @@ -19,10 +19,8 @@ class request(ctypes.LittleEndianStructure): ] _pack_ = 1 - class response(InfuseRpcCommand.VariableSizeResponse): - base_fields = [] - var_name = "rc" - var_type = ctypes.c_int16 + class response(VLACompatLittleEndianStruct): + vla_field = ("rc", 0 * ctypes.c_int16) @staticmethod def kv_store_value_factory(id, value_bytes): diff --git a/src/infuse_iot/rpc_wrappers/kv_read.py b/src/infuse_iot/rpc_wrappers/kv_read.py index b98986b..783c15e 100644 --- a/src/infuse_iot/rpc_wrappers/kv_read.py +++ b/src/infuse_iot/rpc_wrappers/kv_read.py @@ -3,7 +3,11 @@ import ctypes import errno import os +from typing import Any +from tabulate import tabulate + +import infuse_iot.generated.kv_definitions as kv import infuse_iot.generated.rpc_definitions as defs from infuse_iot.commands import InfuseRpcCommand @@ -69,12 +73,25 @@ def handle_response(self, return_code, response): for r in response: if r.len > 0: b = bytes(r.data) + kv_type = kv.slots.ID_MAPPING[r.id] + kv_val = kv_type.vla_from_buffer_copy(b) - print(f"Key: {r.id} ({r.len} bytes):") + print(f"Key: {kv_type.NAME} ({r.len} bytes):") print(f"\tHex: {b.hex()}") - try: - print(f"\tStr: {b.decode('utf-8')}") - except UnicodeDecodeError: - pass + + fields = [] + for field_name, field_val in kv_val.iter_fields(): + fmt_val: Any + if isinstance(field_val, ctypes.Array): + if field_val._type_ == ctypes.c_char: + fmt_val = bytes(field_val).decode("utf-8") + elif field_val._type_ == ctypes.c_ubyte: + fmt_val = bytes(field_val).hex() + else: + fmt_val = list(field_val) + else: + fmt_val = field_val + fields.append((field_name, fmt_val)) + print(tabulate(fields)) else: print(f"Key: {r.id} (Failed to read '{errno.errorcode[-r.len]}')") diff --git a/src/infuse_iot/rpc_wrappers/lte_at_cmd.py b/src/infuse_iot/rpc_wrappers/lte_at_cmd.py index c1e0751..e9aa016 100644 --- a/src/infuse_iot/rpc_wrappers/lte_at_cmd.py +++ b/src/infuse_iot/rpc_wrappers/lte_at_cmd.py @@ -5,15 +5,15 @@ import infuse_iot.generated.rpc_definitions as defs from infuse_iot.commands import InfuseRpcCommand +from infuse_iot.util.ctypes import VLACompatLittleEndianStruct class lte_at_cmd(InfuseRpcCommand, defs.lte_at_cmd): class request(ctypes.LittleEndianStructure): _pack_ = 1 - class response(InfuseRpcCommand.VariableSizeResponse): - var_name = "rsp" - var_type = ctypes.c_char + class response(VLACompatLittleEndianStruct): + vla_field = ("rsp", 0 * ctypes.c_char) @classmethod def add_parser(cls, parser): @@ -32,6 +32,6 @@ def handle_response(self, return_code, response): if return_code != 0: print(f"Failed to run command ({os.strerror(-return_code)})") return - decoded = response.rsp.decode("utf-8").strip() + decoded = bytes(response.rsp).decode("utf-8").strip() print(decoded) diff --git a/src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py b/src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py index a3f5d55..6f29e44 100644 --- a/src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py +++ b/src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py @@ -6,6 +6,7 @@ import infuse_iot.generated.rpc_definitions as defs from infuse_iot.commands import InfuseRpcCommand +from infuse_iot.util.ctypes import VLACompatLittleEndianStruct class lte_pdp_ctx(InfuseRpcCommand, defs.kv_write): @@ -18,10 +19,8 @@ class request(ctypes.LittleEndianStructure): ] _pack_ = 1 - class response(InfuseRpcCommand.VariableSizeResponse): - base_fields = [] - var_name = "rc" - var_type = ctypes.c_int16 + class response(VLACompatLittleEndianStruct): + vla_field = ("rc", 0 * ctypes.c_int16) class PDPFamily(enum.IntEnum): IPv4 = 0 diff --git a/src/infuse_iot/rpc_wrappers/wifi_configure.py b/src/infuse_iot/rpc_wrappers/wifi_configure.py index e3cad2f..4a40140 100644 --- a/src/infuse_iot/rpc_wrappers/wifi_configure.py +++ b/src/infuse_iot/rpc_wrappers/wifi_configure.py @@ -5,6 +5,7 @@ import infuse_iot.generated.rpc_definitions as defs from infuse_iot.commands import InfuseRpcCommand +from infuse_iot.util.ctypes import VLACompatLittleEndianStruct class wifi_configure(InfuseRpcCommand, defs.kv_write): @@ -17,10 +18,8 @@ class request(ctypes.LittleEndianStructure): ] _pack_ = 1 - class response(InfuseRpcCommand.VariableSizeResponse): - base_fields = [] - var_name = "rc" - var_type = ctypes.c_int16 + class response(VLACompatLittleEndianStruct): + vla_field = ("rc", 0 * ctypes.c_int16) @staticmethod def kv_store_value_factory(id, value_bytes): diff --git a/src/infuse_iot/rpc_wrappers/zbus_channel_state.py b/src/infuse_iot/rpc_wrappers/zbus_channel_state.py index 8424202..70f2cd2 100644 --- a/src/infuse_iot/rpc_wrappers/zbus_channel_state.py +++ b/src/infuse_iot/rpc_wrappers/zbus_channel_state.py @@ -8,17 +8,17 @@ import infuse_iot.generated.rpc_definitions as rpc_defs from infuse_iot.commands import InfuseRpcCommand from infuse_iot.tdf import tdf_definitions as defs +from infuse_iot.util.ctypes import VLACompatLittleEndianStruct class zbus_channel_state(InfuseRpcCommand, rpc_defs.zbus_channel_state): - class response(InfuseRpcCommand.VariableSizeResponse): - base_fields = [ + class response(VLACompatLittleEndianStruct): + _fields_ = [ ("pub_timestamp", ctypes.c_uint64), ("pub_count", ctypes.c_uint32), ("pub_period_ms", ctypes.c_uint32), ] - var_name = "data" - var_type = ctypes.c_byte + vla_field = ("data", 0 * ctypes.c_byte) class BatteryChannel: id = 0x43210000 diff --git a/src/infuse_iot/tools/rpc.py b/src/infuse_iot/tools/rpc.py index 3eafb2f..ebf1b16 100644 --- a/src/infuse_iot/tools/rpc.py +++ b/src/infuse_iot/tools/rpc.py @@ -80,6 +80,11 @@ def run(self): rpc_client = RpcClient(self._client, mtu, self._id, self.rx_handler) params = bytes(self._command.request_struct()) + if hasattr(self._command.response, "vla_from_buffer_copy"): # type: ignore + decode_fn = self._command.response.vla_from_buffer_copy # type: ignore + else: + decode_fn = self._command.response.from_buffer_copy # type: ignore + if self._command.RPC_DATA_SEND: hdr, rsp = rpc_client.run_data_send_cmd( self._command.COMMAND_ID, # type: ignore @@ -87,7 +92,7 @@ def run(self): params, self._command.data_payload(), self._command.data_progress_cb, - self._command.response.from_buffer_copy, # type: ignore + decode_fn, ) elif self._command.RPC_DATA_RECEIVE: hdr, rsp = rpc_client.run_data_recv_cmd( @@ -95,14 +100,14 @@ def run(self): self._command.auth_level(), params, self._command.data_recv_cb, - self._command.response.from_buffer_copy, # type: ignore + decode_fn, ) else: hdr, rsp = rpc_client.run_standard_cmd( self._command.COMMAND_ID, # type: ignore self._command.auth_level(), params, - self._command.response.from_buffer_copy, # type: ignore + decode_fn, ) # Handle response self._command.handle_response(hdr.return_code, rsp) diff --git a/src/infuse_iot/util/ctypes.py b/src/infuse_iot/util/ctypes.py index 03e5771..f9b92b3 100644 --- a/src/infuse_iot/util/ctypes.py +++ b/src/infuse_iot/util/ctypes.py @@ -1,7 +1,56 @@ #!/usr/bin/env python3 import ctypes +from collections.abc import Generator + +from typing_extensions import Any, Self def bytes_to_uint8(b: bytes): return (len(b) * ctypes.c_uint8)(*b) + + +class VLACompatLittleEndianStruct(ctypes.LittleEndianStructure): + vla_field: tuple[str, type[Any]] | None = None + + @classmethod + def vla_from_buffer_copy(cls, source, offset=0) -> Self: + base = cls.from_buffer_copy(source, offset) + if cls.vla_field is None: + return base + + remainder = source[ctypes.sizeof(cls) :] + vla_field_name, vla_field_type = cls.vla_field # type: ignore + + if issubclass(vla_field_type, ctypes.Array): + array_base: ctypes._CData = vla_field_type._type_ # type: ignore + + # Determine the number of VLA elements on "source" + vla_byte_len = (len(source) - offset) - ctypes.sizeof(cls) + vla_element_size = ctypes.sizeof(array_base) + if vla_byte_len % vla_element_size != 0: + raise TypeError(f"Unaligned VLA buffer for {cls} (len {len(source)})") + vla_num = vla_byte_len // vla_element_size + vla_type = vla_num * array_base + vla_val = vla_type.from_buffer_copy(remainder) + elif issubclass(vla_field_type, VLACompatLittleEndianStruct): + vla_val = vla_field_type.vla_from_buffer_copy(remainder) + else: + raise RuntimeError(f"Unhandled VLA type {vla_field_type}") + + setattr(base, vla_field_name, vla_val) + return base + + def iter_fields(self, prefix: str = "") -> Generator[tuple[str, Any], None, None]: + for field_name, _field_type in self._fields_: + val = getattr(self, field_name) + if isinstance(val, VLACompatLittleEndianStruct): + yield from val.iter_fields(f"{field_name}.") + else: + yield (f"{prefix}{field_name}", val) + if vla_field := self.vla_field: + val = getattr(self, vla_field[0]) + if isinstance(val, VLACompatLittleEndianStruct): + yield from val.iter_fields(f"{vla_field[0]}.") + else: + yield (f"{prefix}{vla_field[0]}", val)