diff --git a/scripts/apn_set.py b/scripts/apn_set.py new file mode 100755 index 0000000..321f086 --- /dev/null +++ b/scripts/apn_set.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 + +import argparse +import ctypes + +from rich.live import Live +from rich.status import Status +from rich.table import Table + +import infuse_iot.generated.kv_definitions as kv +import infuse_iot.generated.rpc_definitions as rpc +from infuse_iot.epacket.packet import Auth +from infuse_iot.generated.tdf_definitions import readings +from infuse_iot.rpc_client import RpcClient +from infuse_iot.socket_comms import ( + GatewayRequestConnectionRequest, + LocalClient, + default_multicast_address, +) +from infuse_iot.tdf import TDF +from infuse_iot.util.ctypes import VLACompatLittleEndianStruct + + +class APNSetter: + def __init__(self, args: argparse.Namespace): + self.app_id = args.app + self.apn = args.apn + self.client = LocalClient(default_multicast_address(), 1.0) + self.decoder = TDF() + self.state = "Scanning" + + self.already: list[int] = [] + self.updated: list[int] = [] + + def progress_table(self): + table = Table() + table.add_column() + table.add_column("Count") + table.add_row("Updated", str(len(self.updated))) + table.add_row("Already", str(len(self.already))) + + meta = Table(box=None) + meta.add_column() + meta.add_row(Status(self.state)) + meta.add_row(table) + + return meta + + class response(VLACompatLittleEndianStruct): + _fields_ = [] + vla_field = ("rc", ctypes.c_int16) + + def state_update(self, live: Live, state: str): + self.state = state + live.update(self.progress_table()) + + def announce_observed(self, live: Live, infuse_id: int, pkt: readings.announce): + if infuse_id in self.updated or infuse_id in self.already: + return + if pkt.application != self.app_id: + return + try: + self.state_update(live, f"Connecting to {infuse_id:016X}") + with self.client.connection(infuse_id, GatewayRequestConnectionRequest.DataType.COMMAND) as mtu: + rpc_client = RpcClient(self.client, mtu, infuse_id) + + ipv4 = 0 + family_bytes = ipv4.to_bytes(1, "little") + apn_bytes = self.apn.encode("utf-8") + b"\x00" + val_bytes = family_bytes + len(apn_bytes).to_bytes(1, "little") + apn_bytes + + all_vals = ( + bytes(rpc.rpc_struct_kv_store_value(kv.slots.lte_pdp_config.BASE_ID, len(val_bytes))) + val_bytes + ) + params = bytes(rpc.kv_write.request(1)) + all_vals + + hdr, rsp = rpc_client.run_standard_cmd( + rpc.kv_write.COMMAND_ID, Auth.DEVICE, params, self.response.from_buffer_copy + ) + if hdr.return_code == 0: + assert rsp is not None and hasattr(rsp, "rc") + if rsp.rc[0] == 0: + self.already.append(infuse_id) + else: + self.updated.append(infuse_id) + + except ConnectionRefusedError: + self.state_update(live, "Scanning") + return + self.state_update(live, "Scanning") + + def run(self): + with Live(self.progress_table(), refresh_per_second=4) as live: + for source, announce in self.client.observe_announce(): + self.announce_observed(live, source.infuse_id, announce) + live.update(self.progress_table()) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser("Configure the LTE APN of devices") + addr_group = parser.add_mutually_exclusive_group(required=True) + addr_group.add_argument("--app", type=lambda x: int(x, 0), help="Application ID to configure") + parser.add_argument("--apn", type=str, required=True, help="LTE APN value") + + args = parser.parse_args() + + try: + tool = APNSetter(args) + tool.run() + except KeyboardInterrupt: + pass diff --git a/scripts/reboot_count_reset.py b/scripts/reboot_count_reset.py new file mode 100755 index 0000000..853cf90 --- /dev/null +++ b/scripts/reboot_count_reset.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 + +import argparse + +from rich.live import Live +from rich.status import Status +from rich.table import Table + +import infuse_iot.generated.kv_definitions as kv +import infuse_iot.generated.rpc_definitions as rpc +from infuse_iot.epacket.packet import Auth +from infuse_iot.generated.tdf_definitions import readings +from infuse_iot.rpc_client import RpcClient +from infuse_iot.socket_comms import ( + GatewayRequestConnectionRequest, + LocalClient, + default_multicast_address, +) +from infuse_iot.tdf import TDF + + +class RebootCountResetter: + def __init__(self, args: argparse.Namespace): + self.app_id = args.app + self.count = args.count + self.client = LocalClient(default_multicast_address(), 1.0) + self.decoder = TDF() + self.state = "Scanning" + + self.already: list[int] = [] + self.updated: list[int] = [] + + def progress_table(self): + table = Table() + table.add_column() + table.add_column("Count") + table.add_row("Updated", str(len(self.updated))) + table.add_row("Already", str(len(self.already))) + + meta = Table(box=None) + meta.add_column() + meta.add_row(table) + meta.add_row(Status(self.state)) + + return meta + + def state_update(self, live: Live, state: str): + self.state = state + live.update(self.progress_table()) + + def announce_observed(self, live: Live, infuse_id: int, pkt: readings.announce): + if pkt.application != self.app_id: + return + if pkt.reboots == self.count: + if (infuse_id not in self.already) and (infuse_id not in self.updated): + self.already.append(infuse_id) + return + try: + self.state_update(live, f"Connecting to {infuse_id:016X}") + with self.client.connection(infuse_id, GatewayRequestConnectionRequest.DataType.COMMAND) as mtu: + rpc_client = RpcClient(self.client, mtu, infuse_id) + + key_val = bytes(kv.slots.reboots(self.count)) + all_vals = bytes(rpc.rpc_struct_kv_store_value(kv.slots.reboots.BASE_ID, len(key_val))) + key_val + params = bytes(rpc.kv_write.request(1)) + all_vals + + hdr, _ = rpc_client.run_standard_cmd( + rpc.kv_write.COMMAND_ID, Auth.DEVICE, params, rpc.kv_write.response.from_buffer_copy + ) + if hdr.return_code == 0: + self.updated.append(infuse_id) + + except ConnectionRefusedError: + self.state_update(live, "Scanning") + return + self.state_update(live, "Scanning") + + def run(self): + with Live(self.progress_table(), refresh_per_second=4) as live: + for source, announce in self.client.observe_announce(): + self.announce_observed(live, source.infuse_id, announce) + live.update(self.progress_table()) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser("Reset reboot counters to a common value") + addr_group = parser.add_mutually_exclusive_group(required=True) + addr_group.add_argument("--app", type=lambda x: int(x, 0), help="Application ID to reset") + parser.add_argument("--count", type=int, default=0, help="Value to reset count to") + + args = parser.parse_args() + + try: + tool = RebootCountResetter(args) + tool.run() + except KeyboardInterrupt: + pass diff --git a/src/infuse_iot/rpc_wrappers/zbus_channel_state.py b/src/infuse_iot/rpc_wrappers/zbus_channel_state.py index 57405ea..f5f45c3 100644 --- a/src/infuse_iot/rpc_wrappers/zbus_channel_state.py +++ b/src/infuse_iot/rpc_wrappers/zbus_channel_state.py @@ -32,6 +32,14 @@ class LocationChannel(ctypes.LittleEndianStructure): id = 0x43210004 data = defs.readings.gcs_wgs84_llha + class NavPvtUbxChannel(ctypes.LittleEndianStructure): + id = 0x43210007 + data = defs.readings.ubx_nav_pvt + + class NavPvtNRFChannel(ctypes.LittleEndianStructure): + id = 0x43210008 + data = defs.readings.nrf9x_gnss_pvt + @classmethod def add_parser(cls, parser): group = parser.add_mutually_exclusive_group(required=True) @@ -56,6 +64,20 @@ def add_parser(cls, parser): const=cls.LocationChannel, help="Location channel", ) + group.add_argument( + "--nav-pvt-ubx", + dest="channel", + action="store_const", + const=cls.NavPvtUbxChannel, + help="ublox NAV-PVT channel", + ) + group.add_argument( + "--nav-pvt-nrf", + dest="channel", + action="store_const", + const=cls.NavPvtNRFChannel, + help="nRF9x NAV-PVT channel", + ) def __init__(self, args): self._channel = args.channel diff --git a/src/infuse_iot/socket_comms.py b/src/infuse_iot/socket_comms.py index 29f457b..9dd037d 100644 --- a/src/infuse_iot/socket_comms.py +++ b/src/infuse_iot/socket_comms.py @@ -27,6 +27,7 @@ class Type(enum.IntEnum): CONNECTION_FAILED = 1 CONNECTION_CREATED = 2 CONNECTION_DROPPED = 3 + KNOWN_DEVICES = 4 def to_json(self) -> dict: """Convert class to json dictionary""" @@ -44,6 +45,8 @@ def from_json(cls, values: dict) -> Self: return cast(Self, ClientNotificationConnectionCreated.from_json(values)) elif values["type"] == cls.Type.CONNECTION_DROPPED: return cast(Self, ClientNotificationConnectionDropped.from_json(values)) + elif values["type"] == cls.Type.KNOWN_DEVICES: + return cast(Self, ClientNotificationObservedDevices.from_json(values)) raise NotImplementedError @@ -62,6 +65,23 @@ def from_json(cls, values: dict) -> Self: return cls(PacketReceived.from_json(values["epacket"])) +class ClientNotificationObservedDevices(ClientNotification): + TYPE = ClientNotification.Type.KNOWN_DEVICES + + def __init__(self, devices: dict[int, dict]): + self.devices = devices + + def to_json(self) -> dict: + """Convert class to json dictionary""" + return {"type": int(self.TYPE), "devices": json.dumps(self.devices)} + + @classmethod + def from_json(cls, values: dict) -> Self: + raw = json.loads(values["devices"]) + decoded = {int(k): v for k, v in raw.items()} + return cls(decoded) + + class ClientNotificationConnection(ClientNotification): TYPE = 0 @@ -110,6 +130,7 @@ class Type(enum.IntEnum): EPACKET_SEND = 0 CONNECTION_REQUEST = 1 CONNECTION_RELEASE = 2 + KNOWN_DEVICES = 3 def to_json(self) -> dict: """Convert class to json dictionary""" @@ -124,6 +145,8 @@ def from_json(cls, values: dict) -> Self: return cast(Self, GatewayRequestConnectionRequest.from_json(values)) elif values["type"] == cls.Type.CONNECTION_RELEASE: return cast(Self, GatewayRequestConnectionRelease.from_json(values)) + elif values["type"] == cls.Type.KNOWN_DEVICES: + return cast(Self, GatewayRequestObservedDevices.from_json(values)) raise NotImplementedError @@ -143,6 +166,22 @@ def from_json(cls, values: dict) -> Self: return cls(PacketOutput.from_json(values["epacket"])) +class GatewayRequestObservedDevices(GatewayRequest): + """Request list of known devices""" + + TYPE = GatewayRequest.Type.KNOWN_DEVICES + + def __init__(self): + pass + + def to_json(self) -> dict: + return {"type": int(self.TYPE)} + + @classmethod + def from_json(cls, values: dict) -> Self: + return cls() + + class GatewayRequestConnection(GatewayRequest): TYPE = 0 diff --git a/src/infuse_iot/tools/data_logger_sync.py b/src/infuse_iot/tools/data_logger_sync.py new file mode 100644 index 0000000..5b24354 --- /dev/null +++ b/src/infuse_iot/tools/data_logger_sync.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 + +"""Synchronise data logger state from remote devices""" + +__author__ = "Jordan Yates" +__copyright__ = "Copyright 2025, Embeint Inc" + +import binascii +import glob +import os +import pathlib + +from rich.live import Live +from rich.progress import ( + DownloadColumn, + Progress, + TransferSpeedColumn, +) +from rich.status import Status +from rich.table import Table + +from infuse_iot.commands import InfuseCommand +from infuse_iot.epacket.packet import Auth +from infuse_iot.generated.rpc_definitions import data_logger_read, rpc_enum_data_logger +from infuse_iot.generated.tdf_definitions import readings +from infuse_iot.rpc_client import RpcClient +from infuse_iot.socket_comms import ( + GatewayRequestConnectionRequest, + LocalClient, + default_multicast_address, +) +from infuse_iot.util.argparse import ValidDir + + +class DeviceState: + BLOCK_SIZE = 512 + + def __init__(self, path: pathlib.Path): + self.path = path + self.on_disk: int = 0 + self.on_device: int | None = None + self.downloaded: int = 0 + if not self.path.exists(): + self.path.touch() + else: + self.on_disk = os.path.getsize(self.path) // self.BLOCK_SIZE + + def observe(self, announce: readings.announce): + self.on_device = announce.blocks + + def append_data(self, data: bytes): + assert len(data) % self.BLOCK_SIZE == 0 + new_blocks = len(data) // self.BLOCK_SIZE + + with self.path.open("+ba") as f: + f.write(data) + self.on_disk += new_blocks + self.downloaded += new_blocks + + +class SubCommand(InfuseCommand): + NAME = "data_logger_sync" + HELP = "Synchronise data logger state from remote devices" + DESCRIPTION = "Synchronise data logger state from remote devices" + + def __init__(self, args): + self._client = LocalClient(default_multicast_address(), 60.0) + self._min_rssi: int | None = args.rssi + self._app = args.app + self._out = args.out + self._blocks_max = args.blocks + self._logger = args.logger + self._device_state: dict[int, DeviceState] = {} + + self.bytes_to_read = 0 + self.pending_bytes = b"" + self.task = None + self.state = "Scanning" + self.progress = Progress( + *Progress.get_default_columns(), + DownloadColumn(), + TransferSpeedColumn(), + ) + + for file in glob.glob(f"{self._out}/*_{self._logger}.bin"): + file_path = pathlib.Path(file) + name_parts = file_path.name.split("_") + device_id = int(name_parts[0], 16) + + self._device_state[device_id] = DeviceState(file_path) + + @classmethod + def add_parser(cls, parser): + parser.add_argument("--out", "-o", type=ValidDir, required=True, help="Output folder for synced data") + parser.add_argument("--blocks", "-b", type=int, default=500, help="Number of blocks to download per connection") + parser.add_argument("--app", "-a", type=lambda x: int(x, 0), help="Application ID to limit sync to") + parser.add_argument("--rssi", "-r", type=int, help="Minimum RSSI to attempt downloading data") + logger_group = parser.add_mutually_exclusive_group(required=True) + logger_group.add_argument( + "--onboard", + dest="logger", + action="store_const", + const=rpc_enum_data_logger.FLASH_ONBOARD, + help="Synchronise onboard loggers", + ) + logger_group.add_argument( + "--removable", + dest="logger", + action="store_const", + const=rpc_enum_data_logger.FLASH_REMOVABLE, + help="Synchronise removable loggers", + ) + + def progress_table(self): + table = Table() + table.add_column("Device ID") + table.add_column("On Disk") + table.add_column("On Device") + table.add_column("Downloaded") + for device, state in self._device_state.items(): + on_device = str(state.on_device) if state.on_device is not None else "?" + table.add_row(f"{device:016x}", str(state.on_disk), on_device, str(state.downloaded)) + + meta = Table(box=None) + meta.add_column() + meta.add_row(table) + meta.add_row(Status(self.state)) + meta.add_row(self.progress) + + return meta + + def state_update(self, live: Live, state: str): + self.state = state + live.update(self.progress_table()) + + def data_progress_cb(self, offset: int, payload: bytes): + if self.task is None: + self.state = "Reading data logger" + self.task = self.progress.add_task("", total=self.bytes_to_read) + self.pending_bytes += payload + self.progress.update(self.task, completed=offset) + + def handle_sync(self, live: Live, device_id: int, state: DeviceState): + self.state_update(live, f"Connecting to {device_id:016X}") + assert state.on_device is not None + try: + with self._client.connection(device_id, GatewayRequestConnectionRequest.DataType.COMMAND) as mtu: + self.state_update(live, f"Downloading blocks from {device_id:016X}") + rpc_client = RpcClient(self._client, mtu, device_id) + blocks_pending = state.on_device - state.on_disk + blocks_to_read = min(blocks_pending, self._blocks_max) + last_block = state.on_disk + blocks_to_read - 1 + self.bytes_to_read = 512 * blocks_to_read + params = data_logger_read.request(self._logger, state.on_disk, last_block) + self.pending_bytes = b"" + hdr, rsp = rpc_client.run_data_recv_cmd( + data_logger_read.COMMAND_ID, + Auth.DEVICE, + bytes(params), + self.bytes_to_read, + self.data_progress_cb, + data_logger_read.response.from_buffer_copy, + ) + if hdr.return_code == 0: + assert isinstance(rsp, data_logger_read.response) + if rsp.sent_len == len(self.pending_bytes) and rsp.sent_crc == binascii.crc32(self.pending_bytes): + state.append_data(self.pending_bytes) + if self.task is not None: + self.progress.remove_task(self.task) + self.task = None + + except ConnectionRefusedError: + self.state_update(live, "Scanning") + except ConnectionAbortedError: + self.state_update(live, "Scanning") + + def run(self): + with Live(self.progress_table(), refresh_per_second=4) as live: + for source, announce in self._client.observe_announce(): + self.state_update(live, "Scanning") + if self._app and announce.application != self._app: + continue + if announce.flags & 0x01: + # Skip data on removable loggers + continue + + if source.infuse_id not in self._device_state: + output_file = self._out / f"{source.infuse_id:016x}_{self._logger}.bin" + self._device_state[source.infuse_id] = DeviceState(output_file) + state = self._device_state[source.infuse_id] + state.observe(announce) + + # Is signal strong enough to connect? + if self._min_rssi and source.rssi < self._min_rssi: + continue + + assert state.on_device is not None + if state.on_disk < state.on_device: + self.handle_sync(live, source.infuse_id, state) diff --git a/src/infuse_iot/tools/ota_upgrade.py b/src/infuse_iot/tools/ota_upgrade.py index 54a8a8c..c202d3a 100644 --- a/src/infuse_iot/tools/ota_upgrade.py +++ b/src/infuse_iot/tools/ota_upgrade.py @@ -37,6 +37,7 @@ class SubCommand(InfuseCommand): def __init__(self, args): self._client = LocalClient(default_multicast_address(), 60.0) self._min_rssi: int | None = args.rssi + self._single_id: int | None = args.id self._release: ValidRelease = args.release self._app_name = self._release.metadata["application"]["primary"] self._app_id = self._release.metadata["application"]["id"] @@ -63,6 +64,7 @@ def add_parser(cls, parser): "--release", "-r", type=ValidRelease, required=True, help="Application release to upgrade to" ) parser.add_argument("--rssi", type=int, help="Minimum RSSI to attempt upgrade process") + parser.add_argument("--id", type=lambda x: int(x, 0), help="Single device to upgrade") def progress_table(self): table = Table() @@ -102,6 +104,12 @@ def run(self): self.state_update(live, "Scanning") if announce.application != self._app_id: continue + if self._single_id: + if self._single_id != source.infuse_id: + continue + if self._single_id in self._handled: + # The one device we care about has been upgraded + return if source.infuse_id in self._handled: continue v = announce.version