diff --git a/src/infuse_iot/generated/rpc_definitions.py b/src/infuse_iot/generated/rpc_definitions.py index 3c88040..64ca756 100644 --- a/src/infuse_iot/generated/rpc_definitions.py +++ b/src/infuse_iot/generated/rpc_definitions.py @@ -895,6 +895,25 @@ class response(VLACompatLittleEndianStruct): _pack_ = 1 +class bt_mcumgr_reboot: + """Connect to a Bluetooth device and run the MCUMGR reboot command""" + + HELP = "Connect to a Bluetooth device and run the MCUMGR reboot command" + DESCRIPTION = "Connect to a Bluetooth device and run the MCUMGR reboot command" + COMMAND_ID = 54 + + class request(VLACompatLittleEndianStruct): + _fields_ = [ + ("peer", rpc_struct_bt_addr_le), + ("conn_timeout_ms", ctypes.c_uint16), + ] + _pack_ = 1 + + class response(VLACompatLittleEndianStruct): + _fields_ = [] + _pack_ = 1 + + class gravity_reference_update: """Store the current accelerometer vector as the gravity reference""" @@ -1045,6 +1064,7 @@ class response(VLACompatLittleEndianStruct): "bt_disconnect", "bt_file_copy_basic", "bt_file_copy_coap", + "bt_mcumgr_reboot", "gravity_reference_update", "security_state", "data_sender", diff --git a/src/infuse_iot/rpc_wrappers/bt_mcumgr_reboot.py b/src/infuse_iot/rpc_wrappers/bt_mcumgr_reboot.py new file mode 100644 index 0000000..0c20eae --- /dev/null +++ b/src/infuse_iot/rpc_wrappers/bt_mcumgr_reboot.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 + + +import infuse_iot.definitions.rpc as defs +from infuse_iot.commands import InfuseRpcCommand +from infuse_iot.definitions.rpc import ( + rpc_enum_bt_le_addr_type, + rpc_struct_bt_addr_le, +) +from infuse_iot.util.argparse import BtLeAddress +from infuse_iot.util.ctypes import bytes_to_uint8 +from infuse_iot.zephyr.errno import errno +from infuse_iot.zephyr.hci import error + + +class bt_mcumgr_reboot(InfuseRpcCommand, defs.bt_mcumgr_reboot): + @classmethod + def add_parser(cls, parser): + parser.add_argument("--timeout", type=int, default=5000, help="Connection timeout (ms)") + addr_group = parser.add_mutually_exclusive_group(required=True) + addr_group.add_argument("--public", type=BtLeAddress, help="Public Bluetooth address") + addr_group.add_argument("--random", type=BtLeAddress, help="Random Bluetooth address") + + def __init__(self, args): + self.args = args + + def request_struct(self) -> defs.bt_mcumgr_reboot.request: + if self.args.public: + peer = rpc_struct_bt_addr_le( + rpc_enum_bt_le_addr_type.PUBLIC, + bytes_to_uint8(self.args.public.to_bytes(6, "little")), + ) + else: + peer = rpc_struct_bt_addr_le( + rpc_enum_bt_le_addr_type.RANDOM, + bytes_to_uint8(self.args.random.to_bytes(6, "little")), + ) + + return self.request( + peer, + self.args.timeout, + ) + + def handle_response(self, return_code, response): + if return_code < 0: + print(f"Failed to reboot ({errno.strerror(-return_code)})") + return + elif return_code > 0: + print(f"Failed to reboot ({error.strerror(return_code)})") + + print("Reboot request sent") diff --git a/src/infuse_iot/rpc_wrappers/kv_bt_peer.py b/src/infuse_iot/rpc_wrappers/kv_bt_peer.py index 0f12835..29a9e0e 100644 --- a/src/infuse_iot/rpc_wrappers/kv_bt_peer.py +++ b/src/infuse_iot/rpc_wrappers/kv_bt_peer.py @@ -5,8 +5,9 @@ import infuse_iot.definitions.rpc as defs from infuse_iot.commands import InfuseRpcCommand +from infuse_iot.rpc_wrappers.kv_write import kv_write from infuse_iot.util.argparse import BtLeAddress -from infuse_iot.util.ctypes import VLACompatLittleEndianStruct, bytes_to_uint8 +from infuse_iot.util.ctypes import VLACompatLittleEndianStruct from infuse_iot.zephyr.errno import errno @@ -23,18 +24,6 @@ class request(ctypes.LittleEndianStructure): class response(VLACompatLittleEndianStruct): vla_field = ("rc", 0 * ctypes.c_int16) - @staticmethod - def kv_store_value_factory(id, value_bytes): - class kv_store_value(ctypes.LittleEndianStructure): - _fields_ = [ - ("id", ctypes.c_uint16), - ("len", ctypes.c_uint16), - ("data", ctypes.c_ubyte * len(value_bytes)), - ] - _pack_ = 1 - - return kv_store_value(id, len(value_bytes), bytes_to_uint8(value_bytes)) - @classmethod def add_parser(cls, parser): addr_group = parser.add_mutually_exclusive_group(required=True) @@ -54,7 +43,7 @@ def __init__(self, args): self.addr = addr def request_struct(self): - addr_struct = self.kv_store_value_factory(50, self.addr) + addr_struct = kv_write.kv_store_value_factory(50, self.addr) request_bytes = bytes(addr_struct) return bytes(self.request(1)) + request_bytes diff --git a/src/infuse_iot/rpc_wrappers/kv_read.py b/src/infuse_iot/rpc_wrappers/kv_read.py index 6b1b1b6..8bc2eb2 100644 --- a/src/infuse_iot/rpc_wrappers/kv_read.py +++ b/src/infuse_iot/rpc_wrappers/kv_read.py @@ -20,7 +20,7 @@ class request(ctypes.LittleEndianStructure): class response: @classmethod - def from_buffer_copy(cls, source: bytes, _offset: int = 0): + def from_buffer_copy(cls, source: bytes, _offset: int = 0) -> list: values = [] while len(source) > 0: @@ -50,6 +50,10 @@ class kv_store_value(ctypes.LittleEndianStructure): source = source[ctypes.sizeof(struct) :] return values + @classmethod + def vla_from_buffer_copy(cls, source: bytes, offset: int = 0) -> list: + return cls.from_buffer_copy(source, offset) + @classmethod def add_parser(cls, parser): parser.add_argument("--keys", "-k", required=True, type=int, nargs="+", help="Keys to read") diff --git a/src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py b/src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py index 1d3548c..08758b6 100644 --- a/src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py +++ b/src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py @@ -5,6 +5,7 @@ import infuse_iot.definitions.rpc as defs from infuse_iot.commands import InfuseRpcCommand +from infuse_iot.rpc_wrappers.kv_write import kv_write from infuse_iot.util.ctypes import VLACompatLittleEndianStruct from infuse_iot.zephyr.errno import errno @@ -28,18 +29,6 @@ class PDPFamily(enum.IntEnum): IPv4v6 = 2 NonIP = 3 - @staticmethod - def kv_store_value_factory(id, value_bytes): - class kv_store_value(ctypes.LittleEndianStructure): - _fields_ = [ - ("id", ctypes.c_uint16), - ("len", ctypes.c_uint16), - ("data", ctypes.c_ubyte * len(value_bytes)), - ] - _pack_ = 1 - - return kv_store_value(id, len(value_bytes), (ctypes.c_ubyte * len(value_bytes))(*value_bytes)) - @classmethod def add_parser(cls, parser): parser.add_argument("--apn", "-a", type=str, required=True, help="Access Point Name") @@ -66,7 +55,7 @@ def request_struct(self): apn_bytes = self.args.apn.encode("utf-8") + b"\x00" apn_bytes = len(apn_bytes).to_bytes(1, "little") + apn_bytes - pdp_struct = self.kv_store_value_factory(45, family_bytes + apn_bytes) + pdp_struct = kv_write.kv_store_value_factory(45, family_bytes + apn_bytes) request_bytes = bytes(pdp_struct) return bytes(self.request(1)) + request_bytes diff --git a/src/infuse_iot/rpc_wrappers/wifi_configure.py b/src/infuse_iot/rpc_wrappers/wifi_configure.py index 60bacba..7b20f05 100644 --- a/src/infuse_iot/rpc_wrappers/wifi_configure.py +++ b/src/infuse_iot/rpc_wrappers/wifi_configure.py @@ -5,7 +5,8 @@ import infuse_iot.definitions.rpc as defs import infuse_iot.zephyr.wifi as wifi from infuse_iot.commands import InfuseRpcCommand -from infuse_iot.util.ctypes import UINT8_MAX, VLACompatLittleEndianStruct, bytes_to_uint8 +from infuse_iot.rpc_wrappers.kv_write import kv_write +from infuse_iot.util.ctypes import UINT8_MAX, VLACompatLittleEndianStruct from infuse_iot.zephyr.errno import errno @@ -22,18 +23,6 @@ class request(ctypes.LittleEndianStructure): class response(VLACompatLittleEndianStruct): vla_field = ("rc", 0 * ctypes.c_int16) - @staticmethod - def kv_store_value_factory(id, value_bytes): - class kv_store_value(ctypes.LittleEndianStructure): - _fields_ = [ - ("id", ctypes.c_uint16), - ("len", ctypes.c_uint16), - ("data", ctypes.c_ubyte * len(value_bytes)), - ] - _pack_ = 1 - - return kv_store_value(id, len(value_bytes), bytes_to_uint8(value_bytes)) - @classmethod def add_parser(cls, parser): parser.add_argument("--ssid", "-s", type=str, help="Network name") @@ -80,9 +69,9 @@ def request_struct(self): psk_bytes = self.args.psk.encode("utf-8") + b"\x00" chan_bytes = self.args.band.to_bytes(1, "little") + self.args.channel.to_bytes(1, "little") - ssid_struct = self.kv_store_value_factory(20, len(ssid_bytes).to_bytes(1, "little") + ssid_bytes) - psk_struct = self.kv_store_value_factory(21, len(psk_bytes).to_bytes(1, "little") + psk_bytes) - chan_struct = self.kv_store_value_factory(22, chan_bytes) + ssid_struct = kv_write.kv_store_value_factory(20, len(ssid_bytes).to_bytes(1, "little") + ssid_bytes) + psk_struct = kv_write.kv_store_value_factory(21, len(psk_bytes).to_bytes(1, "little") + psk_bytes) + chan_struct = kv_write.kv_store_value_factory(22, chan_bytes) request_bytes = bytes(ssid_struct) + bytes(psk_struct) + bytes(chan_struct) return bytes(self.request(3)) + request_bytes diff --git a/src/infuse_iot/util/internal.py b/src/infuse_iot/util/internal.py index 7fa4aa2..06cd4c1 100644 --- a/src/infuse_iot/util/internal.py +++ b/src/infuse_iot/util/internal.py @@ -16,13 +16,13 @@ def extension_load(name: str) -> None | types.ModuleType: defs_path = infuse_iot.credentials.get_custom_definitions_path() if defs_path is None: return None - tdf_extensions = pathlib.Path(defs_path) / f"{name}.py" - if not tdf_extensions.exists(): + extensions_file = pathlib.Path(defs_path) / f"{name}.py" + if not extensions_file.exists(): return None try: # Import the extension file - spec = importlib.util.spec_from_file_location(f"infuse_iot.extension.{name}", str(tdf_extensions)) + spec = importlib.util.spec_from_file_location(f"infuse_iot.extension.{name}", str(extensions_file)) if spec is None or spec.loader is None: return None file_module = importlib.util.module_from_spec(spec)