diff --git a/src/infuse_iot/generated/rpc_definitions.py b/src/infuse_iot/generated/rpc_definitions.py index 01157a7..08711b4 100644 --- a/src/infuse_iot/generated/rpc_definitions.py +++ b/src/infuse_iot/generated/rpc_definitions.py @@ -35,7 +35,7 @@ class rpc_struct_kv_store_crc(ctypes.LittleEndianStructure): _fields_ = [ ("id", ctypes.c_uint16), - ("crc", ctypes.c_int32), + ("crc", ctypes.c_uint32), ] _pack_ = 1 @@ -111,10 +111,10 @@ class rpc_struct_lte_state(ctypes.LittleEndianStructure): ("mnc", ctypes.c_uint16), ("cell_id", ctypes.c_uint32), ("tac", ctypes.c_uint32), - ("tau", ctypes.c_uint32), + ("tau", ctypes.c_int32), ("earfcn", ctypes.c_uint16), ("band", ctypes.c_uint8), - ("psm_active_time", ctypes.c_uint16), + ("psm_active_time", ctypes.c_int16), ("edrx_interval", ctypes.c_float), ("edrx_paging_window", ctypes.c_float), ("rsrp", ctypes.c_int16), diff --git a/src/infuse_iot/rpc_wrappers/kv_read.py b/src/infuse_iot/rpc_wrappers/kv_read.py index 783c15e..9954fcb 100644 --- a/src/infuse_iot/rpc_wrappers/kv_read.py +++ b/src/infuse_iot/rpc_wrappers/kv_read.py @@ -73,7 +73,12 @@ def handle_response(self, return_code, response): for r in response: if r.len > 0: b = bytes(r.data) - kv_type = kv.slots.ID_MAPPING[r.id] + try: + kv_type = kv.slots.ID_MAPPING[r.id] + except KeyError: + print(f"Key: {r.id} ({r.len} bytes):") + print(f"\tHex: {b.hex()}") + continue kv_val = kv_type.vla_from_buffer_copy(b) print(f"Key: {kv_type.NAME} ({r.len} bytes):") diff --git a/src/infuse_iot/rpc_wrappers/kv_reflect_crcs.py b/src/infuse_iot/rpc_wrappers/kv_reflect_crcs.py new file mode 100644 index 0000000..161d02e --- /dev/null +++ b/src/infuse_iot/rpc_wrappers/kv_reflect_crcs.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +import ctypes +import os + +import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand +from infuse_iot.util.ctypes import VLACompatLittleEndianStruct + + +class kv_reflect_crcs(InfuseRpcCommand, defs.kv_reflect_crcs): + HELP = "Read KV store reflection crc values" + DESCRIPTION = "Read KV store reflection crc values" + + class response(VLACompatLittleEndianStruct): + _fields_ = [ + ("num", ctypes.c_uint16), + ("remaining", ctypes.c_uint16), + ] + vla_field = ("crcs", 0 * defs.rpc_struct_kv_store_crc) + _pack_ = 1 + + @classmethod + def add_parser(cls, parser): + parser.add_argument("--offset", type=int, default=0, help="Offset to start CRC read at") + + def __init__(self, args): + self.offset = args.offset + + def request_struct(self): + return self.request(self.offset) + + def handle_response(self, return_code, response): + if return_code != 0: + print(f"Invalid query ({os.strerror(-return_code)})") + return + + print(f"Slot CRCs ({response.remaining} remaining):") + for slot in response.crcs: + print(f"\t{slot.id:5d}: 0x{slot.crc:08x}") diff --git a/src/infuse_iot/rpc_wrappers/kv_write.py b/src/infuse_iot/rpc_wrappers/kv_write.py new file mode 100644 index 0000000..fdee397 --- /dev/null +++ b/src/infuse_iot/rpc_wrappers/kv_write.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 + +import ctypes +import os + +import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand +from infuse_iot.util.ctypes import VLACompatLittleEndianStruct, bytes_to_uint8 + + +class kv_write(InfuseRpcCommand, defs.kv_write): + HELP = "Write an arbitrary kv value" + DESCRIPTION = "Write an arbitrary kv value" + + class request(ctypes.LittleEndianStructure): + _fields_ = [ + ("num", ctypes.c_uint8), + ] + _pack_ = 1 + + class response(VLACompatLittleEndianStruct): + vla_field = ("rc", 0 * ctypes.c_int16) + + @staticmethod + def kv_store_value_factory(id, value_bytes): + class kv_store_value(ctypes.LittleEndianStructure): + _fields_ = [ + ("id", ctypes.c_uint16), + ("len", ctypes.c_uint16), + ("data", ctypes.c_ubyte * len(value_bytes)), + ] + _pack_ = 1 + + return kv_store_value(id, len(value_bytes), bytes_to_uint8(value_bytes)) + + @classmethod + def add_parser(cls, parser): + parser.add_argument("--key", type=int, required=True, help="KV key ID") + parser.add_argument("--value", type=str, required=True, help="KV value as hex string") + + def __init__(self, args): + self.key = args.key + self.value = bytes.fromhex(args.value) + + def request_struct(self): + kv_struct = self.kv_store_value_factory(self.key, self.value) + request_bytes = bytes(kv_struct) + return bytes(self.request(1)) + request_bytes + + def handle_response(self, return_code, response): + if return_code != 0: + print(f"Invalid data buffer ({os.strerror(-return_code)})") + return + + def print_status(name, rc): + if rc < 0: + print(f"{name} failed to write ({os.strerror(-rc)})") + elif rc == 0: + print(f"{name} already matched") + else: + print(f"{name} updated") + + print_status(f"{self.key} value", response.rc[0]) diff --git a/src/infuse_iot/rpc_wrappers/lte_state.py b/src/infuse_iot/rpc_wrappers/lte_state.py index 6b68df1..0d6adac 100644 --- a/src/infuse_iot/rpc_wrappers/lte_state.py +++ b/src/infuse_iot/rpc_wrappers/lte_state.py @@ -42,10 +42,10 @@ class lte_state_struct(ctypes.LittleEndianStructure): ("mnc", ctypes.c_uint16), ("cell_id", ctypes.c_uint32), ("tac", ctypes.c_uint32), - ("tau", ctypes.c_uint32), + ("tau", ctypes.c_int32), ("earfcn", ctypes.c_uint16), ("band", ctypes.c_uint8), - ("psm_active_time", ctypes.c_uint16), + ("psm_active_time", ctypes.c_int16), ("edrx_interval", ctypes.c_float), ("edrx_window", ctypes.c_float), ("rsrp", ctypes.c_int16), diff --git a/src/infuse_iot/tools/provision.py b/src/infuse_iot/tools/provision.py index fcf8bf5..51064ba 100644 --- a/src/infuse_iot/tools/provision.py +++ b/src/infuse_iot/tools/provision.py @@ -132,7 +132,7 @@ def run(self): interface: soc.ProvisioningInterface if self._vendor == "nrf": interface = nrf.Interface(self._snr) - if self._vendor == "stm": + elif self._vendor == "stm": interface = stm.Interface() else: raise NotImplementedError(f"Unhandled vendor '{self._vendor}'") diff --git a/src/infuse_iot/tools/tdf_list.py b/src/infuse_iot/tools/tdf_list.py index 361a279..56b58f2 100644 --- a/src/infuse_iot/tools/tdf_list.py +++ b/src/infuse_iot/tools/tdf_list.py @@ -52,7 +52,7 @@ def run(self) -> None: if idx == 0: if tdf.time is not None: if tdf.period is None: - time = "" + time = InfuseTime.utc_time_string(tdf.time) else: offset = (len(tdf.data) - 1) * tdf.period time = InfuseTime.utc_time_string(tdf.time + offset)