From 24526e8626873ecf30239c32f24fc9538ad0ae4f Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Wed, 27 Nov 2024 14:25:54 +1000 Subject: [PATCH 1/6] util: threading: extract signaled thread Extract the simple signaled thread class from `gateway` and add a basic test. Signed-off-by: Jordan Yates --- src/infuse_iot/tools/gateway.py | 19 +------------------ src/infuse_iot/util/threading.py | 21 +++++++++++++++++++++ tests/util/test_threading.py | 27 +++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 18 deletions(-) create mode 100644 src/infuse_iot/util/threading.py create mode 100644 tests/util/test_threading.py diff --git a/src/infuse_iot/tools/gateway.py b/src/infuse_iot/tools/gateway.py index 3310338..364c2fd 100644 --- a/src/infuse_iot/tools/gateway.py +++ b/src/infuse_iot/tools/gateway.py @@ -20,6 +20,7 @@ from infuse_iot.util.argparse import ValidFile from infuse_iot.util.console import Console +from infuse_iot.util.threading import SignaledThread from infuse_iot.common import InfuseType, InfuseID from infuse_iot.commands import InfuseCommand from infuse_iot.serial_comms import RttPort, SerialPort, SerialFrame @@ -142,24 +143,6 @@ def security_state_done(pkt: PacketReceived, _: int, response: bytes): cb_event.wait(1.0) -class SignaledThread(threading.Thread): - """Thread that can be signaled to terminate""" - - def __init__(self, fn): - self._fn = fn - self._sig = threading.Event() - super().__init__(target=self.run_loop) - - def stop(self): - """Signal thread to terminate""" - self._sig.set() - - def run_loop(self): - """Run the thread function in a loop""" - while not self._sig.is_set(): - self._fn() - - class SerialRxThread(SignaledThread): """Receive serial frames from the serial port""" diff --git a/src/infuse_iot/util/threading.py b/src/infuse_iot/util/threading.py new file mode 100644 index 0000000..bc34158 --- /dev/null +++ b/src/infuse_iot/util/threading.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 + +import threading + + +class SignaledThread(threading.Thread): + """Thread that can be signaled to terminate""" + + def __init__(self, fn): + self._fn = fn + self._sig = threading.Event() + super().__init__(target=self.run_loop) + + def stop(self): + """Signal thread to terminate""" + self._sig.set() + + def run_loop(self): + """Run the thread function in a loop""" + while not self._sig.is_set(): + self._fn() diff --git a/tests/util/test_threading.py b/tests/util/test_threading.py new file mode 100644 index 0000000..ad7a487 --- /dev/null +++ b/tests/util/test_threading.py @@ -0,0 +1,27 @@ +import os +import time + +from infuse_iot.util.threading import SignaledThread + +assert "TOXTEMPDIR" in os.environ, "you must run these tests using tox" + + +count = 0 + + +def dummy_work(): + global count + count += 1 + time.sleep(0.1) + + +def test_signaled_thread(): + t = SignaledThread(dummy_work) + t.start() + assert t.is_alive() + time.sleep(1.0) + assert t.is_alive() + t.stop() + t.join(0.5) + assert not t.is_alive() + assert 10 <= count <= 11 From 133e6b835b6d121c6ad405fac2b453aedecfeb64 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Wed, 27 Nov 2024 14:30:00 +1000 Subject: [PATCH 2/6] tools: localhost: use `SignaledThread` Use the `SignaledThread` class instead of a local variant. Signed-off-by: Jordan Yates --- src/infuse_iot/tools/localhost.py | 93 ++++++++++++++----------------- 1 file changed, 43 insertions(+), 50 deletions(-) diff --git a/src/infuse_iot/tools/localhost.py b/src/infuse_iot/tools/localhost.py index b3d4488..d485e72 100644 --- a/src/infuse_iot/tools/localhost.py +++ b/src/infuse_iot/tools/localhost.py @@ -14,6 +14,7 @@ from aiohttp.web_runner import GracefulExit from infuse_iot.util.console import Console +from infuse_iot.util.threading import SignaledThread from infuse_iot.common import InfuseType from infuse_iot.commands import InfuseCommand from infuse_iot.socket_comms import ( @@ -36,7 +37,6 @@ def __init__(self, _): self._columns = {} self._data = {} - self._thread_end = threading.Event() self._client = LocalClient(default_multicast_address(), 1.0) self._decoder = TDF() @@ -152,53 +152,46 @@ def column_title(struct, name): return out def recv_thread(self): - while True: - msg = self._client.receive() - if self._thread_end.is_set(): - break - if msg is None: - continue - if msg.type != ClientNotification.Type.EPACKET_RECV: - continue - if msg.epacket.ptype != InfuseType.TDF: - continue - - source = msg.epacket.route[0] - - self._data_lock.acquire(blocking=True) - - if source.infuse_id not in self._data: - self._data[source.infuse_id] = { - "infuse_id": f"0x{source.infuse_id:016x}", - } - self._data[source.infuse_id]["time"] = InfuseTime.utc_time_string( - time.time() - ) - if source.interface == interface.ID.BT_ADV: - addr_bytes = source.interface_address.val.addr_val.to_bytes(6, "big") - addr_str = ":".join([f"{x:02x}" for x in addr_bytes]) - self._data[source.infuse_id]["bt_addr"] = addr_str - self._data[source.infuse_id]["bt_rssi"] = source.rssi - - for tdf in self._decoder.decode(msg.epacket.payload): - t = tdf.data[-1] - if t.name not in self._columns: - self._columns[t.name] = self.tdf_columns(t) - if t.name not in self._data[source.infuse_id]: - self._data[source.infuse_id][t.name] = {} - - for field in t.iter_fields(): - if field.subfield: - if field.field not in self._data[source.infuse_id][t.name]: - self._data[source.infuse_id][t.name][field.field] = {} - self._data[source.infuse_id][t.name][field.field][ - field.subfield - ] = field.val_fmt() - else: - self._data[source.infuse_id][t.name][field.field] = ( - field.val_fmt() - ) - self._data_lock.release() + msg = self._client.receive() + if msg is None: + return + if msg.type != ClientNotification.Type.EPACKET_RECV: + return + if msg.epacket.ptype != InfuseType.TDF: + return + + source = msg.epacket.route[0] + + self._data_lock.acquire(blocking=True) + + if source.infuse_id not in self._data: + self._data[source.infuse_id] = { + "infuse_id": f"0x{source.infuse_id:016x}", + } + self._data[source.infuse_id]["time"] = InfuseTime.utc_time_string(time.time()) + if source.interface == interface.ID.BT_ADV: + addr_bytes = source.interface_address.val.addr_val.to_bytes(6, "big") + addr_str = ":".join([f"{x:02x}" for x in addr_bytes]) + self._data[source.infuse_id]["bt_addr"] = addr_str + self._data[source.infuse_id]["bt_rssi"] = source.rssi + + for tdf in self._decoder.decode(msg.epacket.payload): + t = tdf.data[-1] + if t.name not in self._columns: + self._columns[t.name] = self.tdf_columns(t) + if t.name not in self._data[source.infuse_id]: + self._data[source.infuse_id][t.name] = {} + + for field in t.iter_fields(): + if field.subfield: + if field.field not in self._data[source.infuse_id][t.name]: + self._data[source.infuse_id][t.name][field.field] = {} + self._data[source.infuse_id][t.name][field.field][ + field.subfield + ] = field.val_fmt() + else: + self._data[source.infuse_id][t.name][field.field] = field.val_fmt() + self._data_lock.release() def run(self): Console.init() @@ -208,7 +201,7 @@ def run(self): # Route for WebSocket app.router.add_get("/ws", self.websocket_handler) - rx_thread = threading.Thread(target=self.recv_thread) + rx_thread = SignaledThread(self.recv_thread) rx_thread.start() # Run server @@ -217,5 +210,5 @@ def run(self): except GracefulExit: pass finally: - self._thread_end.set() + rx_thread.stop() rx_thread.join(1.0) From b5937fbd99bb0bff658a42586f6c0fe5e67fdcc6 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Wed, 27 Nov 2024 14:41:21 +1000 Subject: [PATCH 3/6] ruff.toml: added Move `ruff` configuration from `tox.ini` to `ruff.toml`, so that running `ruff check .` works. Signed-off-by: Jordan Yates --- ruff.toml | 24 ++++++++++++++++++++++++ tox.ini | 22 ---------------------- 2 files changed, 24 insertions(+), 22 deletions(-) create mode 100644 ruff.toml diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..f5831fe --- /dev/null +++ b/ruff.toml @@ -0,0 +1,24 @@ +line-length = 120 +indent-width = 4 + +exclude = [ + "src/infuse_iot/api_client/" +] + +[lint] +select = [ + "B", # flake8-bugbear + "E", # pycodestyle + "F", # pyflakes + "I", # isort + "SIM", # flake8-simplify + "UP", # pyupgrade + "W", # pycodestyle warnings + ] +ignore = [ + "SIM105", # Allow try-except-pass + "SIM108", # Allow if-else blocks instead of forcing ternary operator + ] + +[format] +line-ending = "lf" diff --git a/tox.ini b/tox.ini index f5d4d32..c7f7187 100644 --- a/tox.ini +++ b/tox.ini @@ -8,28 +8,6 @@ [tox] envlist=py3 -[tool.ruff] -line-length = 120 -indent-width = 4 - -[tool.ruff.lint] -select = [ - "B", # flake8-bugbear - "E", # pycodestyle - "F", # pyflakes - "I", # isort - "SIM", # flake8-simplify - "UP", # pyupgrade - "W", # pycodestyle warnings - ] -ignore = [ - "SIM108", # Allow if-else blocks instead of forcing ternary operator - "UP027", # deprecated pyupgrade rule - ] - -[tool.ruff.format] -line-ending = "lf" - [testenv] deps = setuptools-scm From bc40bf9cf5115eef570745bca6934a532928025e Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Wed, 27 Nov 2024 14:42:17 +1000 Subject: [PATCH 4/6] treewide: automated ruff changes Apply the automated fixes from running `ruff check --fix .` and `ruff format .`. Signed-off-by: Jordan Yates --- src/infuse_iot/app/main.py | 15 +- src/infuse_iot/commands.py | 4 +- src/infuse_iot/credentials.py | 4 +- src/infuse_iot/database.py | 27 +--- src/infuse_iot/diff.py | 139 +++++------------- src/infuse_iot/epacket/common.py | 1 + src/infuse_iot/epacket/interface.py | 19 +-- src/infuse_iot/epacket/packet.py | 62 +++----- src/infuse_iot/generated/rpc_definitions.py | 37 ++--- src/infuse_iot/generated/tdf_base.py | 8 +- src/infuse_iot/generated/tdf_definitions.py | 3 +- .../rpc_wrappers/application_info.py | 2 +- .../rpc_wrappers/bt_connect_infuse.py | 24 +-- src/infuse_iot/rpc_wrappers/bt_disconnect.py | 12 +- src/infuse_iot/rpc_wrappers/coap_download.py | 2 +- src/infuse_iot/rpc_wrappers/fault.py | 2 +- .../rpc_wrappers/file_write_basic.py | 6 +- src/infuse_iot/rpc_wrappers/kv_read.py | 6 +- src/infuse_iot/rpc_wrappers/last_reboot.py | 6 +- src/infuse_iot/rpc_wrappers/lte_at_cmd.py | 2 +- src/infuse_iot/rpc_wrappers/lte_modem_info.py | 4 +- src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py | 10 +- src/infuse_iot/rpc_wrappers/lte_state.py | 16 +- src/infuse_iot/rpc_wrappers/reboot.py | 6 +- src/infuse_iot/rpc_wrappers/security_state.py | 22 +-- src/infuse_iot/rpc_wrappers/time_get.py | 11 +- src/infuse_iot/rpc_wrappers/time_set.py | 5 +- src/infuse_iot/rpc_wrappers/wifi_configure.py | 10 +- src/infuse_iot/rpc_wrappers/wifi_scan.py | 3 +- src/infuse_iot/rpc_wrappers/wifi_state.py | 2 +- .../rpc_wrappers/zbus_channel_state.py | 3 +- src/infuse_iot/serial_comms.py | 11 +- src/infuse_iot/socket_comms.py | 48 ++---- src/infuse_iot/tdf.py | 8 +- src/infuse_iot/time.py | 4 +- src/infuse_iot/tools/cloud.py | 45 ++---- src/infuse_iot/tools/credentials.py | 9 +- src/infuse_iot/tools/csv_annotate.py | 17 +-- src/infuse_iot/tools/csv_plot.py | 8 +- src/infuse_iot/tools/gateway.py | 98 +++++------- src/infuse_iot/tools/localhost.py | 17 +-- src/infuse_iot/tools/native_bt.py | 29 ++-- src/infuse_iot/tools/provision.py | 44 ++---- src/infuse_iot/tools/rpc.py | 29 ++-- src/infuse_iot/tools/serial_throughput.py | 6 +- src/infuse_iot/tools/tdf_csv.py | 18 +-- src/infuse_iot/tools/tdf_list.py | 12 +- src/infuse_iot/util/argparse.py | 1 - src/infuse_iot/util/console.py | 1 + src/infuse_iot/util/crypto.py | 10 +- tests/test_main.py | 4 +- 51 files changed, 295 insertions(+), 597 deletions(-) diff --git a/src/infuse_iot/app/main.py b/src/infuse_iot/app/main.py index cf8890d..536c028 100644 --- a/src/infuse_iot/app/main.py +++ b/src/infuse_iot/app/main.py @@ -7,9 +7,10 @@ __copyright__ = "Copyright 2024, Embeint Inc" import argparse -import sys -import pkgutil import importlib.util +import pkgutil +import sys + import argcomplete import infuse_iot.tools @@ -23,9 +24,7 @@ class InfuseApp: def __init__(self): self.args = None self.parser = argparse.ArgumentParser("infuse") - self.parser.add_argument( - "--version", action="version", version=f"{__version__}" - ) + self.parser.add_argument("--version", action="version", version=f"{__version__}") self._tools = {} # Load tools self._load_tools(self.parser) @@ -40,9 +39,7 @@ def run(self, argv): tool.run() def _load_tools(self, parser: argparse.ArgumentParser): - tools_parser = parser.add_subparsers( - title="commands", metavar="", required=True - ) + tools_parser = parser.add_subparsers(title="commands", metavar="", required=True) # Iterate over tools for _, name, _ in pkgutil.walk_packages(infuse_iot.tools.__path__): @@ -50,7 +47,7 @@ def _load_tools(self, parser: argparse.ArgumentParser): module = importlib.import_module(full_name) # Add tool to parser - tool_cls: InfuseCommand = getattr(module, "SubCommand") + tool_cls: InfuseCommand = module.SubCommand parser = tools_parser.add_parser( tool_cls.NAME, help=tool_cls.HELP, diff --git a/src/infuse_iot/commands.py b/src/infuse_iot/commands.py index 7828460..4be08b1 100644 --- a/src/infuse_iot/commands.py +++ b/src/infuse_iot/commands.py @@ -7,9 +7,7 @@ import argparse import ctypes - -from typing import List, Type, Tuple - +from typing import List, Tuple, Type from infuse_iot.epacket.packet import Auth diff --git a/src/infuse_iot/credentials.py b/src/infuse_iot/credentials.py index d08ec45..425b0b7 100644 --- a/src/infuse_iot/credentials.py +++ b/src/infuse_iot/credentials.py @@ -36,7 +36,5 @@ def load_network(network_id: int): username = f"network-{network_id:06x}" key = keyring.get_password("infuse-iot", username) if key is None: - raise FileNotFoundError( - f"Network key {network_id:06x} does not exist in keyring" - ) + raise FileNotFoundError(f"Network key {network_id:06x} does not exist in keyring") return yaml.safe_load(key) diff --git a/src/infuse_iot/database.py b/src/infuse_iot/database.py index d3670f2..19302de 100644 --- a/src/infuse_iot/database.py +++ b/src/infuse_iot/database.py @@ -1,15 +1,15 @@ #!/usr/bin/env python3 -import binascii import base64 +import binascii from typing import Dict, Tuple from infuse_iot.api_client import Client from infuse_iot.api_client.api.default import get_shared_secret from infuse_iot.api_client.models import Key -from infuse_iot.util.crypto import hkdf_derive -from infuse_iot.epacket.interface import Address as InterfaceAddress from infuse_iot.credentials import get_api_key, load_network +from infuse_iot.epacket.interface import Address as InterfaceAddress +from infuse_iot.util.crypto import hkdf_derive class NoKeyError(KeyError): @@ -82,21 +82,14 @@ def observe_device( if network_id is not None: self.devices[address].network_id = network_id if device_id is not None: - if ( - self.devices[address].device_id is not None - and self.devices[address].device_id != device_id - ): - raise DeviceKeyChangedError( - f"Device key for {address:016x} has changed" - ) + if self.devices[address].device_id is not None and self.devices[address].device_id != device_id: + raise DeviceKeyChangedError(f"Device key for {address:016x} has changed") self.devices[address].device_id = device_id if bt_addr is not None: self.bt_addr[bt_addr] = address self.devices[address].bt_addr = bt_addr - def observe_security_state( - self, address: int, cloud_key: bytes, device_key: bytes, network_id: int - ) -> None: + def observe_security_state(self, address: int, cloud_key: bytes, device_key: bytes, network_id: int) -> None: """Update device state based on security_state response""" if address not in self.devices: self.devices[address] = self.DeviceState(address) @@ -128,9 +121,7 @@ def _network_key(self, network_id: int, interface: bytes, gps_time: int) -> byte key_id = (network_id, interface, time_idx) if key_id not in self._derived_keys: - self._derived_keys[key_id] = hkdf_derive( - base, time_idx.to_bytes(4, "little"), interface - ) + self._derived_keys[key_id] = hkdf_derive(base, time_idx.to_bytes(4, "little"), interface) return self._derived_keys[key_id] @@ -155,9 +146,7 @@ def has_network_id(self, address: int) -> bool: return False return self.devices[address].network_id is not None - def infuse_id_from_bluetooth( - self, bt_addr: InterfaceAddress.BluetoothLeAddr - ) -> int | None: + def infuse_id_from_bluetooth(self, bt_addr: InterfaceAddress.BluetoothLeAddr) -> int | None: """Get Bluetooth address associated with device""" return self.bt_addr.get(bt_addr, None) diff --git a/src/infuse_iot/diff.py b/src/infuse_iot/diff.py index 29898e4..354b188 100644 --- a/src/infuse_iot/diff.py +++ b/src/infuse_iot/diff.py @@ -1,12 +1,12 @@ #!/usr/bin/env python3 import argparse -import enum -import ctypes import binascii - +import ctypes +import enum from collections import defaultdict -from typing import List, Dict, Tuple, Type +from typing import Dict, List, Tuple, Type + from typing_extensions import Self @@ -77,11 +77,7 @@ def from_bytes( or opcode == OpCode.WRITE_LEN_U32 ): return WriteInstr.from_bytes(b, offset, original_offset) - if ( - opcode == OpCode.ADDR_SHIFT_S8 - or opcode == OpCode.ADDR_SHIFT_S16 - or opcode == OpCode.ADDR_SET_U32 - ): + if opcode == OpCode.ADDR_SHIFT_S8 or opcode == OpCode.ADDR_SHIFT_S16 or opcode == OpCode.ADDR_SET_U32: return SetAddrInstr.from_bytes(b, offset, original_offset) if opcode == OpCode.PATCH: return PatchInstr.from_bytes(b, offset, original_offset) @@ -132,9 +128,7 @@ def ctypes_class(self): return self.SetAddrU32 @classmethod - def from_bytes( - cls, b: bytes, offset: int, original_offset: int - ) -> Tuple[Self, int, int]: + def from_bytes(cls, b: bytes, offset: int, original_offset: int) -> Tuple[Self, int, int]: opcode = b[offset] if opcode == OpCode.ADDR_SHIFT_S8: s8 = cls.ShiftAddrS8.from_buffer_copy(b, offset) @@ -154,9 +148,7 @@ def from_bytes( def __bytes__(self): instr = self.ctypes_class() - if instr == self.ShiftAddrS8: - val = self.shift - elif instr == self.ShiftAddrS16: + if instr == self.ShiftAddrS8 or instr == self.ShiftAddrS16: val = self.shift else: val = self.new @@ -165,9 +157,7 @@ def __bytes__(self): def __str__(self): if -32768 <= self.shift <= 32767: - return ( - f" ADDR: shifting {self.shift} (from {self.old:08x} to {self.new:08x})" - ) + return f" ADDR: shifting {self.shift} (from {self.old:08x} to {self.new:08x})" else: return f" ADDR: now {self.new:08x} (shift of {self.new - self.old})" @@ -252,9 +242,7 @@ def _from_opcode(cls, op: OpCode) -> Type[CopyGeneric]: raise RuntimeError @classmethod - def from_bytes( - cls, b: bytes, offset: int, original_offset: int - ) -> Tuple[Self, int, int]: + def from_bytes(cls, b: bytes, offset: int, original_offset: int) -> Tuple[Self, int, int]: opcode = OpCode.from_byte(b[offset]) op_class = cls._from_opcode(opcode) s = op_class.from_buffer_copy(b, offset) @@ -357,9 +345,7 @@ def _from_opcode(cls, op: OpCode) -> Type[WriteGeneric]: raise RuntimeError @classmethod - def from_bytes( - cls, b: bytes, offset: int, original_offset: int - ) -> Tuple[Self, int, int]: + def from_bytes(cls, b: bytes, offset: int, original_offset: int) -> Tuple[Self, int, int]: opcode = OpCode.from_byte(b[offset]) op_class = cls._from_opcode(opcode) s = op_class.from_buffer_copy(b, offset) @@ -433,9 +419,7 @@ def from_bytes(cls, b: bytes, offset: int, original_offset: int): break assert write_len != 0 original_offset += write_len - operations.append( - WriteInstr(b[offset + length : offset + length + write_len]) - ) + operations.append(WriteInstr(b[offset + length : offset + length + write_len])) length += write_len return cls(operations), length, original_offset @@ -540,9 +524,7 @@ def _naive_diff(cls, old: bytes, new: bytes, hash_len: int = 8): # If word exists in original image if val in pre_hash: if write_pending: - instr.append( - WriteInstr(new[write_start : write_start + write_pending]) - ) + instr.append(WriteInstr(new[write_start : write_start + write_pending])) write_pending = 0 old_match = -100 @@ -566,8 +548,7 @@ def _naive_diff(cls, old: bytes, new: bytes, hash_len: int = 8): while ( (new_offset + this_match) < len(new) and (orig_offset + this_match) < len(old) - and new[new_offset + this_match] - == old[orig_offset + this_match] + and new[new_offset + this_match] == old[orig_offset + this_match] ): this_match += 1 @@ -612,9 +593,7 @@ def _cleanup_jumps(cls, old: bytes, instructions: List[Instr]) -> List[Instr]: # ADDR, COPY, ADRR if instr.shift == -instructions[1].shift: # Replace with a write instead - merged.append( - WriteInstr(old[instr.new : instr.new + copy.length]) - ) + merged.append(WriteInstr(old[instr.new : instr.new + copy.length])) replaced = True instructions.pop(0) instructions.pop(0) @@ -627,11 +606,7 @@ def _cleanup_jumps(cls, old: bytes, instructions: List[Instr]) -> List[Instr]: if instr.shift == -instructions[2].shift: write = instructions[1] # Replace with a merged write instead - merged.append( - WriteInstr( - old[instr.new : instr.new + copy.length] + write.data - ) - ) + merged.append(WriteInstr(old[instr.new : instr.new + copy.length] + write.data)) replaced = True instructions.pop(0) instructions.pop(0) @@ -766,9 +741,7 @@ def _write_crack(cls, old: bytes, instructions: List[Instr]) -> List[Instr]: return cracked @classmethod - def _gen_patch_instr( - cls, bin_orig: bytes, bin_new: bytes - ) -> Tuple[Dict, List[Instr]]: + def _gen_patch_instr(cls, bin_orig: bytes, bin_new: bytes) -> Tuple[Dict, List[Instr]]: best_patch = [] best_patch_len = 2**32 @@ -848,9 +821,7 @@ def _patch_load(cls, patch_binary: bytes): }, } - header_crc = binascii.crc32( - patch_binary[: ctypes.sizeof(hdr) - ctypes.sizeof(ctypes.c_uint32)] - ) + header_crc = binascii.crc32(patch_binary[: ctypes.sizeof(hdr) - ctypes.sizeof(ctypes.c_uint32)]) if header_crc != hdr.header_crc: raise ValidationError("Patch header validation failed") if len(data) != hdr.patch_file.length: @@ -866,9 +837,7 @@ def _patch_load(cls, patch_binary: bytes): patch_offset = 0 original_offset = 0 while patch_offset < len(data): - instr, length, original_offset = Instr.from_bytes( - data, patch_offset, original_offset - ) + instr, length, original_offset = Instr.from_bytes(data, patch_offset, original_offset) patch_offset += length instructions.append(instr) @@ -889,9 +858,7 @@ def generate( print(f"Original File: {meta['original']['len']:6d} bytes") print(f" New File: {meta['new']['len']:6d} bytes") - print( - f" Patch File: {len(bin_patch):6d} bytes ({ratio:.2f}%) ({len(instructions):5d} instructions)" - ) + print(f" Patch File: {len(bin_patch):6d} bytes ({ratio:.2f}%) ({len(instructions):5d} instructions)") if verbose: class_count: Dict[OpCode, int] = defaultdict(int) @@ -911,32 +878,18 @@ def generate( return bin_patch @classmethod - def validation( - cls, bin_original: bytes, invalid_length: bool, invalid_crc: bool - ) -> bytes: + def validation(cls, bin_original: bytes, invalid_length: bool, invalid_crc: bool) -> bytes: assert len(bin_original) > 1024 # Manually construct an instruction set that runs all instructions instructions: List[Instr] = [] - instructions.append( - WriteInstr(bin_original[:8], cls_override=WriteInstr.WriteU4) - ) - instructions.append( - WriteInstr(bin_original[8:16], cls_override=WriteInstr.WriteU12) - ) + instructions.append(WriteInstr(bin_original[:8], cls_override=WriteInstr.WriteU4)) + instructions.append(WriteInstr(bin_original[8:16], cls_override=WriteInstr.WriteU12)) instructions.append(SetAddrInstr(16, 8, cls_override=SetAddrInstr.ShiftAddrS8)) - instructions.append( - WriteInstr(bin_original[16:128], cls_override=WriteInstr.WriteU20) - ) - instructions.append( - SetAddrInstr(120, 200, cls_override=SetAddrInstr.ShiftAddrS16) - ) - instructions.append( - WriteInstr(bin_original[128:256], cls_override=WriteInstr.WriteU32) - ) - instructions.append( - SetAddrInstr(328, 256, cls_override=SetAddrInstr.SetAddrU32) - ) + instructions.append(WriteInstr(bin_original[16:128], cls_override=WriteInstr.WriteU20)) + instructions.append(SetAddrInstr(120, 200, cls_override=SetAddrInstr.ShiftAddrS16)) + instructions.append(WriteInstr(bin_original[128:256], cls_override=WriteInstr.WriteU32)) + instructions.append(SetAddrInstr(328, 256, cls_override=SetAddrInstr.SetAddrU32)) instructions.append(CopyInstr(8, cls_override=CopyInstr.CopyU4)) instructions.append(CopyInstr(8, cls_override=CopyInstr.CopyU12)) instructions.append(CopyInstr(128 - 16, cls_override=CopyInstr.CopyU20)) @@ -1033,9 +986,7 @@ def dump( print(f"Original File: {meta['original']['len']:6d} bytes") print(f" New File: {meta['new']['len']:6d} bytes") - print( - f" Patch File: {len(bin_patch)} bytes ({len(instructions):5d} instructions)" - ) + print(f" Patch File: {len(bin_patch)} bytes ({len(instructions):5d} instructions)") class_count: Dict[OpCode, int] = defaultdict(int) for instr in instructions: @@ -1049,9 +1000,7 @@ def dump( print("") print("Total WRITE data:") - print( - f"\t{total_write_bytes} bytes ({100*total_write_bytes/len(bin_patch):.2f}%)" - ) + print(f"\t{total_write_bytes} bytes ({100*total_write_bytes/len(bin_patch):.2f}%)") print("") print("Instruction Count:") @@ -1066,32 +1015,20 @@ def dump( if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument( - "--verbose", "-v", action="store_true", help="Verbose command output" - ) + parser.add_argument("--verbose", "-v", action="store_true", help="Verbose command output") subparser = parser.add_subparsers(dest="command", title="Commands", required=True) # Generate patch file generate_args = subparser.add_parser("generate", help="Generate a patch file") generate_args.add_argument("original", help="Original file to use as base image") - generate_args.add_argument( - "new", help="New file that will be the result of applying the patch" - ) + generate_args.add_argument("new", help="New file that will be the result of applying the patch") generate_args.add_argument("patch", help="Output patch file name") # Generate validation patch file - validation_args = subparser.add_parser( - "validation", help="Generate a patch file for validating appliers" - ) - validation_args.add_argument( - "--invalid-length", action="store_true", help="Incorrect output file length" - ) - validation_args.add_argument( - "--invalid-crc", action="store_true", help="Incorrect output file CRC" - ) - validation_args.add_argument( - "input_file", help="File to use as base image and desired output" - ) + validation_args = subparser.add_parser("validation", help="Generate a patch file for validating appliers") + validation_args.add_argument("--invalid-length", action="store_true", help="Incorrect output file length") + validation_args.add_argument("--invalid-crc", action="store_true", help="Incorrect output file CRC") + validation_args.add_argument("input_file", help="File to use as base image and desired output") validation_args.add_argument("patch", help="Output patch file name") # Apply patch file @@ -1101,9 +1038,7 @@ def dump( patch_args.add_argument("output", help="File to write output to") # Dump patch instructions - dump_args = subparser.add_parser( - "dump", help="Dump patch file instructions to terminal" - ) + dump_args = subparser.add_parser("dump", help="Dump patch file instructions to terminal") dump_args.add_argument("patch", help="Patch file to dump") # Parse args @@ -1122,9 +1057,7 @@ def dump( f_output.write(patch) elif args.command == "validation": with open(args.input_file, "rb") as f_input: - patch = diff.validation( - f_input.read(-1), args.invalid_length, args.invalid_crc - ) + patch = diff.validation(f_input.read(-1), args.invalid_length, args.invalid_crc) with open(args.patch, "wb") as f_output: f_output.write(patch) elif args.command == "patch": diff --git a/src/infuse_iot/epacket/common.py b/src/infuse_iot/epacket/common.py index 9eb9186..b6499fd 100644 --- a/src/infuse_iot/epacket/common.py +++ b/src/infuse_iot/epacket/common.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 from typing import Dict + from typing_extensions import Self diff --git a/src/infuse_iot/epacket/interface.py b/src/infuse_iot/epacket/interface.py index 8f4f396..feb26c6 100644 --- a/src/infuse_iot/epacket/interface.py +++ b/src/infuse_iot/epacket/interface.py @@ -2,13 +2,13 @@ import ctypes import enum - from typing import Dict + from typing_extensions import Self -from infuse_iot.util.ctypes import bytes_to_uint8 -from infuse_iot.epacket.common import Serializable import infuse_iot.generated.rpc_definitions as rpc_defs +from infuse_iot.epacket.common import Serializable +from infuse_iot.util.ctypes import bytes_to_uint8 class ID(enum.Enum): @@ -52,10 +52,7 @@ def __hash__(self) -> int: return (self.addr_type << 48) + self.addr_val def __eq__(self, another) -> bool: - return ( - self.addr_type == another.addr_type - and self.addr_val == another.addr_val - ) + return self.addr_type == another.addr_type and self.addr_val == another.addr_val def __str__(self) -> str: t = "random" if self.addr_type == 1 else "public" @@ -67,9 +64,7 @@ def len(self): def to_ctype(self) -> CtypesFormat: """Convert the address to the ctype format""" - return self.CtypesFormat( - self.addr_type, bytes_to_uint8(self.addr_val.to_bytes(6, "little")) - ) + return self.CtypesFormat(self.addr_type, bytes_to_uint8(self.addr_val.to_bytes(6, "little"))) def to_json(self) -> Dict: return {"i": "BT", "t": self.addr_type, "v": self.addr_val} @@ -81,9 +76,7 @@ def from_json(cls, values: Dict) -> Self: def to_rpc_struct(self) -> rpc_defs.rpc_struct_bt_addr_le: """Convert the address to the common RPC address structure""" - return rpc_defs.rpc_struct_bt_addr_le( - self.addr_type, bytes_to_uint8(self.addr_val.to_bytes(6, "little")) - ) + return rpc_defs.rpc_struct_bt_addr_le(self.addr_type, bytes_to_uint8(self.addr_val.to_bytes(6, "little"))) @classmethod def from_rpc_struct(cls, struct: rpc_defs.rpc_struct_bt_addr_le): diff --git a/src/infuse_iot/epacket/packet.py b/src/infuse_iot/epacket/packet.py index 8be1a6f..f15e98a 100644 --- a/src/infuse_iot/epacket/packet.py +++ b/src/infuse_iot/epacket/packet.py @@ -1,21 +1,21 @@ #!/usr/bin/env python3 +import base64 import ctypes import enum -import base64 -import time import random +import time +from typing import Any, Dict, List, Tuple -from typing import List, Dict, Tuple, Any from typing_extensions import Self from infuse_iot.common import InfuseType +from infuse_iot.database import DeviceDatabase, NoKeyError from infuse_iot.epacket.common import Serializable from infuse_iot.epacket.interface import ID as Interface from infuse_iot.epacket.interface import Address -from infuse_iot.util.crypto import chachapoly_decrypt, chachapoly_encrypt -from infuse_iot.database import DeviceDatabase, NoKeyError from infuse_iot.time import InfuseTime +from infuse_iot.util.crypto import chachapoly_decrypt, chachapoly_encrypt class Auth(enum.IntEnum): @@ -165,9 +165,7 @@ def from_serial(cls, database: DeviceDatabase, serial_frame: bytes) -> List[Self # Decrypting packet if common_header.encrypted: try: - f_header, f_decrypted = frame_type.decrypt( - database, addr.val, packet_bytes - ) + f_header, f_decrypted = frame_type.decrypt(database, addr.val, packet_bytes) except NoKeyError: continue @@ -175,11 +173,7 @@ def from_serial(cls, database: DeviceDatabase, serial_frame: bytes) -> List[Self f_header.device_id, common_header.interface, addr, - ( - Auth.DEVICE - if f_header.flags & Flags.ENCR_DEVICE - else Auth.NETWORK - ), + (Auth.DEVICE if f_header.flags & Flags.ENCR_DEVICE else Auth.NETWORK), f_header.key_metadata, f_header.gps_time, f_header.sequence, @@ -192,9 +186,7 @@ def from_serial(cls, database: DeviceDatabase, serial_frame: bytes) -> List[Self ) else: # Extract payload metadata - decr_header = CtypePacketReceived.DecryptedHeader.from_buffer_copy( - packet_bytes - ) + decr_header = CtypePacketReceived.DecryptedHeader.from_buffer_copy(packet_bytes) del packet_bytes[: ctypes.sizeof(decr_header)] # Notify database of BT Addr -> Infuse ID mapping @@ -204,11 +196,7 @@ def from_serial(cls, database: DeviceDatabase, serial_frame: bytes) -> List[Self decr_header.device_id, common_header.interface, addr, - ( - Auth.DEVICE - if decr_header.flags & Flags.ENCR_DEVICE - else Auth.NETWORK - ), + (Auth.DEVICE if decr_header.flags & Flags.ENCR_DEVICE else Auth.NETWORK), decr_header.key_id, decr_header.gps_time, decr_header.sequence, @@ -245,9 +233,7 @@ def to_serial(self, database: DeviceDatabase) -> bytes: assert bt_addr is not None # Forwarded payload - forward_payload = CtypeBtGattFrame.encrypt( - database, final.infuse_id, self.ptype, Auth.DEVICE, self.payload - ) + forward_payload = CtypeBtGattFrame.encrypt(database, final.infuse_id, self.ptype, Auth.DEVICE, self.payload) # Forwarding header forward_hdr = CtypeForwardHeaderBtGatt( @@ -293,9 +279,7 @@ def to_serial(self, database: DeviceDatabase) -> bytes: # Encrypt and return payload header_bytes = bytes(header) - ciphertext = chachapoly_encrypt( - key, header_bytes[:11], header_bytes[11:], payload - ) + ciphertext = chachapoly_encrypt(key, header_bytes[:11], header_bytes[11:], payload) return header_bytes + ciphertext def to_json(self) -> Dict: @@ -428,16 +412,12 @@ class CtypeBtAdvFrame(CtypeV0VersionedFrame): """Bluetooth Advertising packet header""" @classmethod - def decrypt( - cls, database: DeviceDatabase, bt_addr: Address.BluetoothLeAddr, frame: bytes - ): + def decrypt(cls, database: DeviceDatabase, bt_addr: Address.BluetoothLeAddr, frame: bytes): header = cls.from_buffer_copy(frame) if header.flags & Flags.ENCR_DEVICE: raise NotImplementedError else: - database.observe_device( - header.device_id, network_id=header.key_metadata, bt_addr=bt_addr - ) + database.observe_device(header.device_id, network_id=header.key_metadata, bt_addr=bt_addr) key = database.bt_adv_network_key(header.device_id, header.gps_time) decrypted = chachapoly_decrypt(key, frame[:11], frame[11:23], frame[23:]) @@ -484,25 +464,17 @@ def encrypt( # Encrypt and return payload header_bytes = bytes(header) - ciphertext = chachapoly_encrypt( - key, header_bytes[:11], header_bytes[11:], payload - ) + ciphertext = chachapoly_encrypt(key, header_bytes[:11], header_bytes[11:], payload) return header_bytes + ciphertext @classmethod - def decrypt( - cls, database: DeviceDatabase, bt_addr: Address.BluetoothLeAddr, frame: bytes - ): + def decrypt(cls, database: DeviceDatabase, bt_addr: Address.BluetoothLeAddr, frame: bytes): header = cls.from_buffer_copy(frame) if header.flags & Flags.ENCR_DEVICE: - database.observe_device( - header.device_id, device_id=header.key_metadata, bt_addr=bt_addr - ) + database.observe_device(header.device_id, device_id=header.key_metadata, bt_addr=bt_addr) key = database.bt_gatt_device_key(header.device_id, header.gps_time) else: - database.observe_device( - header.device_id, network_id=header.key_metadata, bt_addr=bt_addr - ) + database.observe_device(header.device_id, network_id=header.key_metadata, bt_addr=bt_addr) key = database.bt_gatt_network_key(header.device_id, header.gps_time) decrypted = chachapoly_decrypt(key, frame[:11], frame[11:23], frame[23:]) diff --git a/src/infuse_iot/generated/rpc_definitions.py b/src/infuse_iot/generated/rpc_definitions.py index b7a433b..671dad3 100644 --- a/src/infuse_iot/generated/rpc_definitions.py +++ b/src/infuse_iot/generated/rpc_definitions.py @@ -205,8 +205,7 @@ class request(ctypes.LittleEndianStructure): _pack_ = 1 class response(ctypes.LittleEndianStructure): - _fields_ = [ - ] + _fields_ = [] _pack_ = 1 @@ -218,8 +217,7 @@ class time_get: COMMAND_ID = 3 class request(ctypes.LittleEndianStructure): - _fields_ = [ - ] + _fields_ = [] _pack_ = 1 class response(ctypes.LittleEndianStructure): @@ -245,8 +243,7 @@ class request(ctypes.LittleEndianStructure): _pack_ = 1 class response(ctypes.LittleEndianStructure): - _fields_ = [ - ] + _fields_ = [] _pack_ = 1 @@ -345,8 +342,7 @@ class application_info: COMMAND_ID = 9 class request(ctypes.LittleEndianStructure): - _fields_ = [ - ] + _fields_ = [] _pack_ = 1 class response(ctypes.LittleEndianStructure): @@ -371,8 +367,7 @@ class wifi_scan: COMMAND_ID = 10 class request(ctypes.LittleEndianStructure): - _fields_ = [ - ] + _fields_ = [] _pack_ = 1 class response(ctypes.LittleEndianStructure): @@ -391,8 +386,7 @@ class wifi_state: COMMAND_ID = 11 class request(ctypes.LittleEndianStructure): - _fields_ = [ - ] + _fields_ = [] _pack_ = 1 class response(ctypes.LittleEndianStructure): @@ -411,8 +405,7 @@ class last_reboot: COMMAND_ID = 12 class request(ctypes.LittleEndianStructure): - _fields_ = [ - ] + _fields_ = [] _pack_ = 1 class response(ctypes.LittleEndianStructure): @@ -506,8 +499,7 @@ class lte_state: COMMAND_ID = 21 class request(ctypes.LittleEndianStructure): - _fields_ = [ - ] + _fields_ = [] _pack_ = 1 class response(ctypes.LittleEndianStructure): @@ -607,8 +599,7 @@ class request(ctypes.LittleEndianStructure): _pack_ = 1 class response(ctypes.LittleEndianStructure): - _fields_ = [ - ] + _fields_ = [] _pack_ = 1 @@ -644,13 +635,11 @@ class data_sender: COMMAND_ID = 32765 class request(ctypes.LittleEndianStructure): - _fields_ = [ - ] + _fields_ = [] _pack_ = 1 class response(ctypes.LittleEndianStructure): - _fields_ = [ - ] + _fields_ = [] _pack_ = 1 @@ -662,8 +651,7 @@ class data_receiver: COMMAND_ID = 32766 class request(ctypes.LittleEndianStructure): - _fields_ = [ - ] + _fields_ = [] _pack_ = 1 class response(ctypes.LittleEndianStructure): @@ -692,4 +680,3 @@ class response(ctypes.LittleEndianStructure): ("array", 0 * ctypes.c_uint8), ] _pack_ = 1 - diff --git a/src/infuse_iot/generated/tdf_base.py b/src/infuse_iot/generated/tdf_base.py index 6c65d14..c52eddb 100644 --- a/src/infuse_iot/generated/tdf_base.py +++ b/src/infuse_iot/generated/tdf_base.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 import ctypes +from typing import Any, Generator, cast -from typing import Generator, Any -from typing_extensions import Self, cast +from typing_extensions import Self def _public_name(internal_field): @@ -13,9 +13,7 @@ def _public_name(internal_field): class TdfField: - def __init__( - self, field: str, subfield: str | None, postfix: str, display_fmt: str, val: Any - ): + def __init__(self, field: str, subfield: str | None, postfix: str, display_fmt: str, val: Any): self.field = field self.subfield = subfield self.postfix = postfix diff --git a/src/infuse_iot/generated/tdf_definitions.py b/src/infuse_iot/generated/tdf_definitions.py index 2241e38..1d89ab2 100644 --- a/src/infuse_iot/generated/tdf_definitions.py +++ b/src/infuse_iot/generated/tdf_definitions.py @@ -3,10 +3,9 @@ """Autogenerated TDF decoding logic""" import ctypes - from typing import Dict -from infuse_iot.generated.tdf_base import TdfStructBase, TdfReadingBase +from infuse_iot.generated.tdf_base import TdfReadingBase, TdfStructBase class structs: diff --git a/src/infuse_iot/rpc_wrappers/application_info.py b/src/infuse_iot/rpc_wrappers/application_info.py index 7ce8bcd..74bef6d 100644 --- a/src/infuse_iot/rpc_wrappers/application_info.py +++ b/src/infuse_iot/rpc_wrappers/application_info.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 -from infuse_iot.commands import InfuseRpcCommand import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand class application_info(InfuseRpcCommand, defs.application_info): diff --git a/src/infuse_iot/rpc_wrappers/bt_connect_infuse.py b/src/infuse_iot/rpc_wrappers/bt_connect_infuse.py index 04bcb99..8e485df 100644 --- a/src/infuse_iot/rpc_wrappers/bt_connect_infuse.py +++ b/src/infuse_iot/rpc_wrappers/bt_connect_infuse.py @@ -1,41 +1,31 @@ #!/usr/bin/env python3 +import infuse_iot.generated.rpc_definitions as defs from infuse_iot.commands import InfuseRpcCommand from infuse_iot.generated.rpc_definitions import ( - rpc_struct_bt_addr_le, rpc_enum_bt_le_addr_type, rpc_enum_infuse_bt_characteristic, + rpc_struct_bt_addr_le, ) from infuse_iot.util.argparse import BtLeAddress from infuse_iot.util.ctypes import bytes_to_uint8 -import infuse_iot.generated.rpc_definitions as defs class bt_connect_infuse(InfuseRpcCommand, defs.bt_connect_infuse): @classmethod def add_parser(cls, parser): - parser.add_argument( - "--timeout", type=int, default=5000, help="Connection timeout (ms)" - ) - parser.add_argument( - "--inactivity", type=int, default=0, help="Data inactivity timeout (ms)" - ) - parser.add_argument( - "--data", action="store_true", help="Subscribe to data characteristic" - ) + parser.add_argument("--timeout", type=int, default=5000, help="Connection timeout (ms)") + parser.add_argument("--inactivity", type=int, default=0, help="Data inactivity timeout (ms)") + parser.add_argument("--data", action="store_true", help="Subscribe to data characteristic") parser.add_argument( "--logging", action="store_true", help="Subscribe to serial logging characteristic", ) addr_group = parser.add_mutually_exclusive_group(required=True) - addr_group.add_argument( - "--public", type=BtLeAddress, help="Public Bluetooth address" - ) - addr_group.add_argument( - "--random", type=BtLeAddress, help="Random Bluetooth address" - ) + addr_group.add_argument("--public", type=BtLeAddress, help="Public Bluetooth address") + addr_group.add_argument("--random", type=BtLeAddress, help="Random Bluetooth address") def __init__(self, args): self.args = args diff --git a/src/infuse_iot/rpc_wrappers/bt_disconnect.py b/src/infuse_iot/rpc_wrappers/bt_disconnect.py index d3e75cb..1f6926d 100644 --- a/src/infuse_iot/rpc_wrappers/bt_disconnect.py +++ b/src/infuse_iot/rpc_wrappers/bt_disconnect.py @@ -1,26 +1,22 @@ #!/usr/bin/env python3 +import infuse_iot.generated.rpc_definitions as defs from infuse_iot.commands import InfuseRpcCommand from infuse_iot.generated.rpc_definitions import ( - rpc_struct_bt_addr_le, rpc_enum_bt_le_addr_type, + rpc_struct_bt_addr_le, ) from infuse_iot.util.argparse import BtLeAddress from infuse_iot.util.ctypes import bytes_to_uint8 -import infuse_iot.generated.rpc_definitions as defs class bt_disconnect(InfuseRpcCommand, defs.bt_disconnect): @classmethod def add_parser(cls, parser): addr_group = parser.add_mutually_exclusive_group(required=True) - addr_group.add_argument( - "--public", type=BtLeAddress, help="Public Bluetooth address" - ) - addr_group.add_argument( - "--random", type=BtLeAddress, help="Random Bluetooth address" - ) + addr_group.add_argument("--public", type=BtLeAddress, help="Public Bluetooth address") + addr_group.add_argument("--random", type=BtLeAddress, help="Random Bluetooth address") def __init__(self, args): self.args = args diff --git a/src/infuse_iot/rpc_wrappers/coap_download.py b/src/infuse_iot/rpc_wrappers/coap_download.py index d4ae48c..3752a00 100644 --- a/src/infuse_iot/rpc_wrappers/coap_download.py +++ b/src/infuse_iot/rpc_wrappers/coap_download.py @@ -2,9 +2,9 @@ import ctypes +import infuse_iot.generated.rpc_definitions as defs from infuse_iot.commands import InfuseRpcCommand from infuse_iot.generated.rpc_definitions import rpc_enum_file_action -import infuse_iot.generated.rpc_definitions as defs class coap_download(InfuseRpcCommand, defs.coap_download): diff --git a/src/infuse_iot/rpc_wrappers/fault.py b/src/infuse_iot/rpc_wrappers/fault.py index b9cadcb..f201d89 100644 --- a/src/infuse_iot/rpc_wrappers/fault.py +++ b/src/infuse_iot/rpc_wrappers/fault.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 -from infuse_iot.commands import InfuseRpcCommand import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand class fault(InfuseRpcCommand, defs.fault): diff --git a/src/infuse_iot/rpc_wrappers/file_write_basic.py b/src/infuse_iot/rpc_wrappers/file_write_basic.py index 5cbf00b..f6eb557 100644 --- a/src/infuse_iot/rpc_wrappers/file_write_basic.py +++ b/src/infuse_iot/rpc_wrappers/file_write_basic.py @@ -3,14 +3,14 @@ import binascii from rich.progress import ( + DownloadColumn, Progress, TransferSpeedColumn, - DownloadColumn, ) -from infuse_iot.commands import InfuseRpcCommand, Auth -from infuse_iot.generated.rpc_definitions import rpc_enum_file_action import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import Auth, InfuseRpcCommand +from infuse_iot.generated.rpc_definitions import rpc_enum_file_action class file_write_basic(InfuseRpcCommand, defs.file_write_basic): diff --git a/src/infuse_iot/rpc_wrappers/kv_read.py b/src/infuse_iot/rpc_wrappers/kv_read.py index 2d879b9..9a1d436 100644 --- a/src/infuse_iot/rpc_wrappers/kv_read.py +++ b/src/infuse_iot/rpc_wrappers/kv_read.py @@ -3,8 +3,8 @@ import ctypes import errno -from infuse_iot.commands import InfuseRpcCommand import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand class kv_read(InfuseRpcCommand, defs.kv_read): @@ -47,9 +47,7 @@ class kv_store_value(ctypes.LittleEndianStructure): @classmethod def add_parser(cls, parser): - parser.add_argument( - "--keys", "-k", required=True, type=int, nargs="+", help="Keys to read" - ) + parser.add_argument("--keys", "-k", required=True, type=int, nargs="+", help="Keys to read") def __init__(self, args): self.keys = args.keys diff --git a/src/infuse_iot/rpc_wrappers/last_reboot.py b/src/infuse_iot/rpc_wrappers/last_reboot.py index baecf23..b66e1a6 100644 --- a/src/infuse_iot/rpc_wrappers/last_reboot.py +++ b/src/infuse_iot/rpc_wrappers/last_reboot.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 -from infuse_iot.commands import InfuseRpcCommand import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand class last_reboot(InfuseRpcCommand, defs.last_reboot): @@ -26,9 +26,7 @@ def handle_response(self, return_code, response): print(f"\t Reason: {response.reason}") print(f"\t Hardware: 0x{response.hardware_flags:08x}") - print( - f"\tReboot Time: {InfuseTime.utc_time_string(t_remote)} ({InfuseTimeSource(response.epoch_time_source)})" - ) + print(f"\tReboot Time: {InfuseTime.utc_time_string(t_remote)} ({InfuseTimeSource(response.epoch_time_source)})") print(f"\t Uptime: {response.uptime}") print(f"\t Param 1: 0x{response.param_1:08x}") print(f"\t Param 2: 0x{response.param_2:08x}") diff --git a/src/infuse_iot/rpc_wrappers/lte_at_cmd.py b/src/infuse_iot/rpc_wrappers/lte_at_cmd.py index 7d795ea..b8df6ce 100644 --- a/src/infuse_iot/rpc_wrappers/lte_at_cmd.py +++ b/src/infuse_iot/rpc_wrappers/lte_at_cmd.py @@ -3,8 +3,8 @@ import ctypes import errno -from infuse_iot.commands import InfuseRpcCommand import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand class lte_at_cmd(InfuseRpcCommand, defs.lte_at_cmd): diff --git a/src/infuse_iot/rpc_wrappers/lte_modem_info.py b/src/infuse_iot/rpc_wrappers/lte_modem_info.py index 2824a1f..a3a7482 100644 --- a/src/infuse_iot/rpc_wrappers/lte_modem_info.py +++ b/src/infuse_iot/rpc_wrappers/lte_modem_info.py @@ -2,8 +2,8 @@ import ctypes -from infuse_iot.commands import InfuseRpcCommand import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand from . import kv_read, lte_pdp_ctx @@ -34,7 +34,7 @@ def handle_response(self, return_code, response): print(f"Failed to query modem info ({return_code})") return - unknown = "_unknown".encode("utf-8") + unknown = b"_unknown" modem_model = bytes(response[0].data) if response[0].len > 0 else unknown modem_firmware = bytes(response[1].data) if response[1].len > 0 else unknown modem_esn = bytes(response[2].data) if response[2].len > 0 else unknown diff --git a/src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py b/src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py index f9cac7f..f50e07f 100644 --- a/src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py +++ b/src/infuse_iot/rpc_wrappers/lte_pdp_ctx.py @@ -3,8 +3,8 @@ import ctypes import enum -from infuse_iot.commands import InfuseRpcCommand import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand class lte_pdp_ctx(InfuseRpcCommand, defs.kv_write): @@ -38,15 +38,11 @@ class kv_store_value(ctypes.LittleEndianStructure): ] _pack_ = 1 - return kv_store_value( - id, len(value_bytes), (ctypes.c_ubyte * len(value_bytes))(*value_bytes) - ) + return kv_store_value(id, len(value_bytes), (ctypes.c_ubyte * len(value_bytes))(*value_bytes)) @classmethod def add_parser(cls, parser): - parser.add_argument( - "--apn", "-a", type=str, required=True, help="Access Point Name" - ) + parser.add_argument("--apn", "-a", type=str, required=True, help="Access Point Name") def __init__(self, args): self.args = args diff --git a/src/infuse_iot/rpc_wrappers/lte_state.py b/src/infuse_iot/rpc_wrappers/lte_state.py index 12746cd..df0cf6a 100644 --- a/src/infuse_iot/rpc_wrappers/lte_state.py +++ b/src/infuse_iot/rpc_wrappers/lte_state.py @@ -3,10 +3,10 @@ import ctypes import ipaddress +import infuse_iot.generated.rpc_definitions as defs from infuse_iot.commands import InfuseRpcCommand -from infuse_iot.zephyr import net_if as z_nif from infuse_iot.zephyr import lte as z_lte -import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.zephyr import net_if as z_nif class interface_state(ctypes.LittleEndianStructure): @@ -114,15 +114,9 @@ def handle_response(self, return_code, response): else: freq_string = "" country = z_lte.MobileCountryCodes.name_from_mcc(lte.mcc) - active_str = ( - f"{lte.psm_active_time} s" if lte.psm_active_time != 65535 else "N/A" - ) - edrx_interval_str = ( - f"{lte.edrx_interval} s" if lte.edrx_interval != -1.0 else "N/A" - ) - edrx_window_str = ( - f"{lte.edrx_window} s" if lte.edrx_window != -1.0 else "N/A" - ) + active_str = f"{lte.psm_active_time} s" if lte.psm_active_time != 65535 else "N/A" + edrx_interval_str = f"{lte.edrx_interval} s" if lte.edrx_interval != -1.0 else "N/A" + edrx_window_str = f"{lte.edrx_window} s" if lte.edrx_window != -1.0 else "N/A" print(f"\t Access Tech: {lte.access_technology}") print(f"\t Country Code: {lte.mcc} ({country})") print(f"\t Network Code: {lte.mnc}") diff --git a/src/infuse_iot/rpc_wrappers/reboot.py b/src/infuse_iot/rpc_wrappers/reboot.py index f943ae6..d003183 100644 --- a/src/infuse_iot/rpc_wrappers/reboot.py +++ b/src/infuse_iot/rpc_wrappers/reboot.py @@ -1,15 +1,13 @@ #!/usr/bin/env python3 -from infuse_iot.commands import InfuseRpcCommand import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand class reboot(InfuseRpcCommand, defs.reboot): @classmethod def add_parser(cls, parser): - parser.add_argument( - "--delay", type=int, default=0, help="Delay until reboot (ms)" - ) + parser.add_argument("--delay", type=int, default=0, help="Delay until reboot (ms)") def __init__(self, args): self._delay_ms = args.delay diff --git a/src/infuse_iot/rpc_wrappers/security_state.py b/src/infuse_iot/rpc_wrappers/security_state.py index cd8b914..24880b7 100644 --- a/src/infuse_iot/rpc_wrappers/security_state.py +++ b/src/infuse_iot/rpc_wrappers/security_state.py @@ -3,16 +3,16 @@ import ctypes import random -from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import x25519 -from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 +from cryptography.hazmat.primitives.kdf.hkdf import HKDF -from infuse_iot.util.argparse import ValidFile -from infuse_iot.util.ctypes import bytes_to_uint8 +import infuse_iot.generated.rpc_definitions as defs from infuse_iot.commands import InfuseRpcCommand from infuse_iot.epacket.packet import Auth -import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.util.argparse import ValidFile +from infuse_iot.util.ctypes import bytes_to_uint8 class challenge_response_header(ctypes.LittleEndianStructure): @@ -48,9 +48,7 @@ class challenge_response_basic(ctypes.LittleEndianStructure): class security_state(InfuseRpcCommand, defs.security_state): @classmethod def add_parser(cls, parser): - parser.add_argument( - "--pem", type=ValidFile, help="Cloud .pem file for identity validation" - ) + parser.add_argument("--pem", type=ValidFile, help="Cloud .pem file for identity validation") def __init__(self, args): self.challenge = bytes_to_uint8(random.randbytes(16)) @@ -67,12 +65,8 @@ def _decrypt_response(self, response): rb = bytes(response.response) with self.pem.open("r") as f: - cloud_private_key = serialization.load_pem_private_key( - f.read().encode("utf-8"), password=None - ) - device_public_key = x25519.X25519PublicKey.from_public_bytes( - bytes(response.device_public_key) - ) + cloud_private_key = serialization.load_pem_private_key(f.read().encode("utf-8"), password=None) + device_public_key = x25519.X25519PublicKey.from_public_bytes(bytes(response.device_public_key)) shared_secret = cloud_private_key.exchange(device_public_key) hkdf = HKDF( algorithm=hashes.SHA256(), diff --git a/src/infuse_iot/rpc_wrappers/time_get.py b/src/infuse_iot/rpc_wrappers/time_get.py index 327250d..ae75420 100644 --- a/src/infuse_iot/rpc_wrappers/time_get.py +++ b/src/infuse_iot/rpc_wrappers/time_get.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 -from infuse_iot.commands import InfuseRpcCommand import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand class time_get(InfuseRpcCommand, defs.time_get): @@ -20,16 +20,13 @@ def handle_response(self, return_code, response): print(f"Failed to query current time ({return_code})") return - from infuse_iot.time import InfuseTime, InfuseTimeSource import time + from infuse_iot.time import InfuseTime, InfuseTimeSource + t_remote = InfuseTime.unix_time_from_epoch(response.epoch_time) t_local = time.time() - sync_age = ( - f"{response.sync_age} seconds ago" - if response.sync_age != 2**32 - 1 - else "Never" - ) + sync_age = f"{response.sync_age} seconds ago" if response.sync_age != 2**32 - 1 else "Never" print(f"\t Source: {InfuseTimeSource(response.time_source)}") print(f"\tRemote Time: {InfuseTime.utc_time_string(t_remote)}") diff --git a/src/infuse_iot/rpc_wrappers/time_set.py b/src/infuse_iot/rpc_wrappers/time_set.py index 4b45a6f..78b8ac5 100644 --- a/src/infuse_iot/rpc_wrappers/time_set.py +++ b/src/infuse_iot/rpc_wrappers/time_set.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 -from infuse_iot.commands import InfuseRpcCommand import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand class time_set(InfuseRpcCommand, defs.time_set): @@ -13,9 +13,10 @@ def __init__(self, args): pass def request_struct(self): - from infuse_iot.time import InfuseTime import time + from infuse_iot.time import InfuseTime + return self.request(InfuseTime.epoch_time_from_unix(time.time())) def handle_response(self, return_code, response): diff --git a/src/infuse_iot/rpc_wrappers/wifi_configure.py b/src/infuse_iot/rpc_wrappers/wifi_configure.py index b7181dc..8f45ee6 100644 --- a/src/infuse_iot/rpc_wrappers/wifi_configure.py +++ b/src/infuse_iot/rpc_wrappers/wifi_configure.py @@ -2,8 +2,8 @@ import ctypes -from infuse_iot.commands import InfuseRpcCommand import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand class wifi_configure(InfuseRpcCommand, defs.kv_write): @@ -45,12 +45,8 @@ def request_struct(self): ssid_bytes = self.args.ssid.encode("utf-8") + b"\x00" psk_bytes = self.args.psk.encode("utf-8") + b"\x00" - ssid_struct = self.kv_store_value_factory( - 20, (len(ssid_bytes) + 1).to_bytes(1, "little") + ssid_bytes - ) - psk_struct = self.kv_store_value_factory( - 21, (len(psk_bytes) + 1).to_bytes(1, "little") + psk_bytes - ) + ssid_struct = self.kv_store_value_factory(20, (len(ssid_bytes) + 1).to_bytes(1, "little") + ssid_bytes) + psk_struct = self.kv_store_value_factory(21, (len(psk_bytes) + 1).to_bytes(1, "little") + psk_bytes) request_bytes = bytes(ssid_struct) + bytes(psk_struct) return bytes(self.request(2)) + request_bytes diff --git a/src/infuse_iot/rpc_wrappers/wifi_scan.py b/src/infuse_iot/rpc_wrappers/wifi_scan.py index db86cb2..4fafce4 100644 --- a/src/infuse_iot/rpc_wrappers/wifi_scan.py +++ b/src/infuse_iot/rpc_wrappers/wifi_scan.py @@ -1,11 +1,12 @@ #!/usr/bin/env python3 import ctypes + import tabulate +import infuse_iot.generated.rpc_definitions as defs from infuse_iot.commands import InfuseRpcCommand from infuse_iot.zephyr import wifi as z_wifi -import infuse_iot.generated.rpc_definitions as defs class wifi_scan(InfuseRpcCommand, defs.wifi_scan): diff --git a/src/infuse_iot/rpc_wrappers/wifi_state.py b/src/infuse_iot/rpc_wrappers/wifi_state.py index c13fdf9..f8a7333 100644 --- a/src/infuse_iot/rpc_wrappers/wifi_state.py +++ b/src/infuse_iot/rpc_wrappers/wifi_state.py @@ -3,10 +3,10 @@ import ctypes import ipaddress +import infuse_iot.generated.rpc_definitions as defs from infuse_iot.commands import InfuseRpcCommand from infuse_iot.zephyr import net_if as z_nif from infuse_iot.zephyr import wifi as z_wifi -import infuse_iot.generated.rpc_definitions as defs class interface_state(ctypes.LittleEndianStructure): diff --git a/src/infuse_iot/rpc_wrappers/zbus_channel_state.py b/src/infuse_iot/rpc_wrappers/zbus_channel_state.py index 9d705ac..3c96bba 100644 --- a/src/infuse_iot/rpc_wrappers/zbus_channel_state.py +++ b/src/infuse_iot/rpc_wrappers/zbus_channel_state.py @@ -1,11 +1,12 @@ #!/usr/bin/env python3 import ctypes + import tabulate +import infuse_iot.generated.rpc_definitions as rpc_defs from infuse_iot.commands import InfuseRpcCommand from infuse_iot.tdf import tdf_definitions as defs -import infuse_iot.generated.rpc_definitions as rpc_defs class zbus_channel_state(InfuseRpcCommand, rpc_defs.zbus_channel_state): diff --git a/src/infuse_iot/serial_comms.py b/src/infuse_iot/serial_comms.py index 21f99d2..3d8312a 100644 --- a/src/infuse_iot/serial_comms.py +++ b/src/infuse_iot/serial_comms.py @@ -1,14 +1,15 @@ #!/usr/bin/env python3 -import serial -import pylink import time +import pylink +import serial + class SerialFrame: """Serial frame reconstructor""" - SYNC = b"\xD5\xCA" + SYNC = b"\xd5\xca" @classmethod def reconstructor(cls): @@ -56,7 +57,7 @@ def read_bytes(self, num): def ping(self): """Magic 1 byte frame to request a response""" - self._ser.write(SerialFrame.SYNC + b"\x01\x00" + b"\x4D") + self._ser.write(SerialFrame.SYNC + b"\x01\x00" + b"\x4d") self._ser.flush() def write(self, packet: bytes): @@ -117,7 +118,7 @@ def read_bytes(self, num): def ping(self): """Magic 1 byte frame to request a response""" - self._jlink.rtt_write(0, SerialFrame.SYNC + b"\x01\x00" + b"\x4D") + self._jlink.rtt_write(0, SerialFrame.SYNC + b"\x01\x00" + b"\x4d") def write(self, packet: bytes): """Write a serial frame to the port""" diff --git a/src/infuse_iot/socket_comms.py b/src/infuse_iot/socket_comms.py index 1ee01ab..7bb721e 100644 --- a/src/infuse_iot/socket_comms.py +++ b/src/infuse_iot/socket_comms.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 +import enum +import json import socket import struct -import json -import enum +from typing import Any, Dict -from typing import Dict, Any from typing_extensions import Self -from infuse_iot.epacket.packet import PacketReceived, PacketOutput +from infuse_iot.epacket.packet import PacketOutput, PacketReceived def default_multicast_address(): @@ -44,11 +44,11 @@ def to_json(self) -> Dict: @classmethod def from_json(cls, values: Dict) -> Self: """Reconstruct class from json dictionary""" - if j := values.get("epacket", None): + if j := values.get("epacket"): epacket = PacketReceived.from_json(j) else: epacket = None - connection_id = values.get("connection_id", None) + connection_id = values.get("connection_id") return cls( notification_type=cls.Type(values["type"]), @@ -85,11 +85,11 @@ def to_json(self) -> Dict: @classmethod def from_json(cls, values: Dict) -> Self: """Reconstruct class from json dictionary""" - if j := values.get("epacket", None): + if j := values.get("epacket"): epacket = PacketOutput.from_json(j) else: epacket = None - connection_id = values.get("connection_id", None) + connection_id = values.get("connection_id") return cls( notification_type=cls.Type(values["type"]), @@ -101,23 +101,17 @@ def from_json(cls, values: Dict) -> Self: class LocalServer: def __init__(self, multicast_address): # Multicast output socket - self._output_sock = socket.socket( - socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP - ) + self._output_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self._output_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) self._output_addr = multicast_address # Single input socket unicast_address = ("localhost", multicast_address[1] + 1) - self._input_sock = socket.socket( - socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP - ) + self._input_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self._input_sock.bind(unicast_address) self._input_sock.settimeout(0.2) def broadcast(self, notification: ClientNotification): - self._output_sock.sendto( - json.dumps(notification.to_json()).encode("utf-8"), self._output_addr - ) + self._output_sock.sendto(json.dumps(notification.to_json()).encode("utf-8"), self._output_addr) def receive(self) -> GatewayRequest | None: try: @@ -134,21 +128,15 @@ def close(self): class LocalClient: def __init__(self, multicast_address, rx_timeout=0.2): # Multicast input socket - self._input_sock = socket.socket( - socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP - ) + self._input_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self._input_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._input_sock.bind(multicast_address) - mreq = struct.pack( - "4sl", socket.inet_aton(multicast_address[0]), socket.INADDR_ANY - ) + mreq = struct.pack("4sl", socket.inet_aton(multicast_address[0]), socket.INADDR_ANY) self._input_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) self._input_sock.settimeout(rx_timeout) # Unicast output socket self._output_addr = ("localhost", multicast_address[1] + 1) - self._output_sock = socket.socket( - socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP - ) + self._output_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # Connection context self._connection_id = None @@ -156,9 +144,7 @@ def set_rx_timeout(self, timeout): self._input_sock.settimeout(timeout) def send(self, request: GatewayRequest): - self._output_sock.sendto( - json.dumps(request.to_json()).encode("utf-8"), self._output_addr - ) + self._output_sock.sendto(json.dumps(request.to_json()).encode("utf-8"), self._output_addr) def receive(self) -> ClientNotification | None: try: @@ -171,9 +157,7 @@ def connection_create(self, infuse_id: int): self._connection_id = infuse_id # Send the request for the connection - req = GatewayRequest( - GatewayRequest.Type.CONNECTION_REQUEST, connection_id=infuse_id - ) + req = GatewayRequest(GatewayRequest.Type.CONNECTION_REQUEST, connection_id=infuse_id) self.send(req) # Wait for response from the server while rsp := self.receive(): diff --git a/src/infuse_iot/tdf.py b/src/infuse_iot/tdf.py index de275d2..08c76bb 100644 --- a/src/infuse_iot/tdf.py +++ b/src/infuse_iot/tdf.py @@ -2,11 +2,10 @@ import ctypes import enum +from typing import Generator, List, Type -from typing import List, Generator, Type - +from infuse_iot.generated import tdf_base, tdf_definitions from infuse_iot.time import InfuseTime -from infuse_iot.generated import tdf_definitions, tdf_base class TDF: @@ -113,8 +112,7 @@ def decode(self, buffer: bytes) -> Generator[Reading, None, None]: assert buffer_time is not None time = InfuseTime.unix_time_from_epoch(buffer_time) data = [ - id_type.from_buffer_consume(total_data[x : x + header.len]) - for x in range(0, total_len, header.len) + id_type.from_buffer_consume(total_data[x : x + header.len]) for x in range(0, total_len, header.len) ] else: data_bytes = buffer[: header.len] diff --git a/src/infuse_iot/time.py b/src/infuse_iot/time.py index 0a6bc4a..076c310 100644 --- a/src/infuse_iot/time.py +++ b/src/infuse_iot/time.py @@ -51,9 +51,7 @@ def epoch_time_from_unix(cls, unix_time: float) -> int: @classmethod def utc_time_string(cls, unix_time: float) -> str: # Trim timezone prefix and microseconds - return str(datetime.datetime.fromtimestamp(unix_time, datetime.timezone.utc))[ - :-9 - ] + return str(datetime.datetime.fromtimestamp(unix_time, datetime.timezone.utc))[:-9] @classmethod def utc_time_string_log(cls, unix_time: float) -> str: diff --git a/src/infuse_iot/tools/cloud.py b/src/infuse_iot/tools/cloud.py index b0ee1d4..b9a74a6 100644 --- a/src/infuse_iot/tools/cloud.py +++ b/src/infuse_iot/tools/cloud.py @@ -7,18 +7,19 @@ from http import HTTPStatus from json import loads + from tabulate import tabulate from infuse_iot.api_client import Client from infuse_iot.api_client.api.default import ( - get_all_organisations, + create_board, create_organisation, + get_all_organisations, get_boards, - create_board, ) -from infuse_iot.api_client.models import NewOrganisation, NewBoard -from infuse_iot.credentials import get_api_key +from infuse_iot.api_client.models import NewBoard, NewOrganisation from infuse_iot.commands import InfuseCommand +from infuse_iot.credentials import get_api_key class CloudSubCommand: @@ -30,9 +31,7 @@ def run(self): def client(self): """Get API client object ready to use""" - return Client(base_url="https://api.dev.infuse-iot.com").with_headers( - {"x-api-key": f"Bearer {get_api_key()}"} - ) + return Client(base_url="https://api.dev.infuse-iot.com").with_headers({"x-api-key": f"Bearer {get_api_key()}"}) class Organisations(CloudSubCommand): @@ -41,9 +40,7 @@ def add_parser(cls, parser): parser_orgs = parser.add_parser("orgs", help="Infuse-IoT organisations") parser_orgs.set_defaults(command_class=cls) - tool_parser = parser_orgs.add_subparsers( - title="commands", metavar="", required=True - ) + tool_parser = parser_orgs.add_subparsers(title="commands", metavar="", required=True) list_parser = tool_parser.add_parser("list") list_parser.set_defaults(command_fn=cls.list) @@ -86,31 +83,19 @@ def create(self, client): class Boards(CloudSubCommand): @classmethod def add_parser(cls, parser): - parser_boards = parser.add_parser( - "boards", help="Infuse-IoT hardware platforms" - ) + parser_boards = parser.add_parser("boards", help="Infuse-IoT hardware platforms") parser_boards.set_defaults(command_class=cls) - tool_parser = parser_boards.add_subparsers( - title="commands", metavar="", required=True - ) + tool_parser = parser_boards.add_subparsers(title="commands", metavar="", required=True) list_parser = tool_parser.add_parser("list") list_parser.set_defaults(command_fn=cls.list) create_parser = tool_parser.add_parser("create") - create_parser.add_argument( - "--name", "-n", type=str, required=True, help="New board name" - ) - create_parser.add_argument( - "--org", "-o", type=str, required=True, help="Organisation ID" - ) - create_parser.add_argument( - "--soc", "-s", type=str, required=True, help="Board system on chip" - ) - create_parser.add_argument( - "--desc", "-d", type=str, required=True, help="Board description" - ) + create_parser.add_argument("--name", "-n", type=str, required=True, help="New board name") + create_parser.add_argument("--org", "-o", type=str, required=True, help="Organisation ID") + create_parser.add_argument("--soc", "-s", type=str, required=True, help="Board system on chip") + create_parser.add_argument("--desc", "-d", type=str, required=True, help="Board description") create_parser.set_defaults(command_fn=cls.create) def run(self): @@ -158,9 +143,7 @@ class SubCommand(InfuseCommand): @classmethod def add_parser(cls, parser): - subparser = parser.add_subparsers( - title="commands", metavar="", required=True - ) + subparser = parser.add_subparsers(title="commands", metavar="", required=True) Organisations.add_parser(subparser) Boards.add_parser(subparser) diff --git a/src/infuse_iot/tools/credentials.py b/src/infuse_iot/tools/credentials.py index b9071d4..266bebd 100644 --- a/src/infuse_iot/tools/credentials.py +++ b/src/infuse_iot/tools/credentials.py @@ -7,10 +7,9 @@ import yaml -from infuse_iot.util.argparse import ValidFile -from infuse_iot.commands import InfuseCommand - from infuse_iot import credentials +from infuse_iot.commands import InfuseCommand +from infuse_iot.util.argparse import ValidFile class SubCommand(InfuseCommand): @@ -21,9 +20,7 @@ class SubCommand(InfuseCommand): @classmethod def add_parser(cls, parser): parser.add_argument("--api-key", type=str, help="Set Infuse-IoT API key") - parser.add_argument( - "--network", type=ValidFile, help="Load network credentials from file" - ) + parser.add_argument("--network", type=ValidFile, help="Load network credentials from file") def __init__(self, args): self.args = args diff --git a/src/infuse_iot/tools/csv_annotate.py b/src/infuse_iot/tools/csv_annotate.py index f6b9a9a..b4a614f 100644 --- a/src/infuse_iot/tools/csv_annotate.py +++ b/src/infuse_iot/tools/csv_annotate.py @@ -5,8 +5,8 @@ __author__ = "Jordan Yates" __copyright__ = "Copyright 2024, Embeint Inc" -from infuse_iot.util.argparse import ValidFile from infuse_iot.commands import InfuseCommand +from infuse_iot.util.argparse import ValidFile class SubCommand(InfuseCommand): @@ -17,9 +17,7 @@ class SubCommand(InfuseCommand): @classmethod def add_parser(cls, parser): parser.add_argument("--file", "-f", required=True, type=ValidFile) - parser.add_argument( - "--default", "-d", type=str, default="N/A", help="Default label" - ) + parser.add_argument("--default", "-d", type=str, default="N/A", help="Default label") def __init__(self, args): self.file = args.file @@ -27,8 +25,8 @@ def __init__(self, args): self.selection = [] def make_plots(self): - from plotly.subplots import make_subplots import plotly.graph_objects as go + from plotly.subplots import make_subplots fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.2) for col in self.df.columns.values[1:]: @@ -41,8 +39,8 @@ def make_plots(self): return fig def run(self): - from dash import Dash, dcc, html, Input, Output, State, callback import pandas as pd + from dash import Dash, Input, Output, State, callback, dcc, html # Read data, add label column self.df = pd.read_csv(self.file, parse_dates=["time"]) @@ -93,9 +91,7 @@ def label_remove(n_clicks, value): @callback(Input("graph", "relayoutData")) def store_relayout_data(relayoutData): - if relayoutData.get("autosize", False) or relayoutData.get( - "xaxis.autorange", False - ): + if relayoutData.get("autosize", False) or relayoutData.get("xaxis.autorange", False): self.selection = [ self.df["time"][0], self.df["time"][self.df.shape[0] - 1], @@ -114,8 +110,7 @@ def store_relayout_data(relayoutData): ) def label_current_selection(n_clicks, value): self.df.loc[ - (self.df["time"] >= self.selection[0]) - & (self.df["time"] <= self.selection[1]), + (self.df["time"] >= self.selection[0]) & (self.df["time"] <= self.selection[1]), "labels", ] = value return self.make_plots() diff --git a/src/infuse_iot/tools/csv_plot.py b/src/infuse_iot/tools/csv_plot.py index 21912cc..83b9d4f 100644 --- a/src/infuse_iot/tools/csv_plot.py +++ b/src/infuse_iot/tools/csv_plot.py @@ -5,8 +5,8 @@ __author__ = "Jordan Yates" __copyright__ = "Copyright 2024, Embeint Inc" -from infuse_iot.util.argparse import ValidFile from infuse_iot.commands import InfuseCommand +from infuse_iot.util.argparse import ValidFile class SubCommand(InfuseCommand): @@ -17,18 +17,16 @@ class SubCommand(InfuseCommand): @classmethod def add_parser(cls, parser): parser.add_argument("--file", "-f", required=True, type=ValidFile) - parser.add_argument( - "--start", type=str, default="2024-01-01", help="Display data after" - ) + parser.add_argument("--start", type=str, default="2024-01-01", help="Display data after") def __init__(self, args): self.file = args.file self.start = args.start def run(self): - from dash import Dash, dcc, html import pandas as pd import plotly.express as px + from dash import Dash, dcc, html df = pd.read_csv(self.file) diff --git a/src/infuse_iot/tools/gateway.py b/src/infuse_iot/tools/gateway.py index 364c2fd..2681f2b 100644 --- a/src/infuse_iot/tools/gateway.py +++ b/src/infuse_iot/tools/gateway.py @@ -6,45 +6,43 @@ __copyright__ = "Copyright 2024, Embeint Inc" import argparse -import time -import threading -import queue +import base64 import ctypes +import io +import queue import random +import threading +import time +from typing import Callable, Dict + import cryptography import cryptography.exceptions -import io -import base64 - -from typing import Dict, Callable -from infuse_iot.util.argparse import ValidFile -from infuse_iot.util.console import Console -from infuse_iot.util.threading import SignaledThread -from infuse_iot.common import InfuseType, InfuseID +import infuse_iot.epacket.interface as interface +import infuse_iot.generated.rpc_definitions as defs +from infuse_iot import rpc from infuse_iot.commands import InfuseCommand -from infuse_iot.serial_comms import RttPort, SerialPort, SerialFrame -from infuse_iot.socket_comms import ( - LocalServer, - ClientNotification, - GatewayRequest, - default_multicast_address, -) +from infuse_iot.common import InfuseID, InfuseType from infuse_iot.database import ( DeviceDatabase, NoKeyError, ) - from infuse_iot.epacket.packet import ( Auth, - PacketReceived, - PacketOutputRouted, HopOutput, + PacketOutputRouted, + PacketReceived, ) -import infuse_iot.epacket.interface as interface - -from infuse_iot import rpc -import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.serial_comms import RttPort, SerialFrame, SerialPort +from infuse_iot.socket_comms import ( + ClientNotification, + GatewayRequest, + LocalServer, + default_multicast_address, +) +from infuse_iot.util.argparse import ValidFile +from infuse_iot.util.console import Console +from infuse_iot.util.threading import SignaledThread class LocalRpcServer: @@ -80,9 +78,7 @@ def handle(self, pkt: PacketReceived): # Was this a BT connect response with key information? if header.command_id == defs.bt_connect_infuse.COMMAND_ID: - resp = defs.bt_connect_infuse.response.from_buffer_copy( - pkt.payload[ctypes.sizeof(header) :] - ) + resp = defs.bt_connect_infuse.response.from_buffer_copy(pkt.payload[ctypes.sizeof(header) :]) if_addr = interface.Address.BluetoothLeAddr.from_rpc_struct(resp.peer) infuse_id = self._ddb.infuse_id_from_bluetooth(if_addr) if infuse_id is None: @@ -124,16 +120,12 @@ def security_state_done(pkt: PacketReceived, _: int, response: bytes): device_key = response[32:64] network_id = int.from_bytes(response[64:68], "little") - self.ddb.observe_security_state( - pkt.route[0].infuse_id, cloud_key, device_key, network_id - ) + self.ddb.observe_security_state(pkt.route[0].infuse_id, cloud_key, device_key, network_id) if cb_event is not None: cb_event.set() # Generate security_state RPC - cmd_pkt = self.rpc.generate( - 30000, random.randbytes(16), Auth.NETWORK, security_state_done - ) + cmd_pkt = self.rpc.generate(30000, random.randbytes(16), Auth.NETWORK, security_state_done) encrypted = cmd_pkt.to_serial(self.ddb) # Write to serial port Console.log_tx(cmd_pkt.ptype, len(encrypted)) @@ -187,9 +179,7 @@ class memfault_chunk_header(ctypes.LittleEndianStructure): hdr = memfault_chunk_header.from_buffer_copy(p) chunk = p[3 : 3 + hdr.len] p = p[3 + hdr.len :] - print( - f"Memfault Chunk {hdr.cnt:3d}: {base64.b64encode(chunk).decode('utf-8')}" - ) + print(f"Memfault Chunk {hdr.cnt:3d}: {base64.b64encode(chunk).decode('utf-8')}") def _handle_serial_frame(self, frame: bytearray): try: @@ -201,14 +191,10 @@ def _handle_serial_frame(self, frame: bytearray): if not self._common.ddb.has_network_id(self._common.ddb.gateway): # Need to know network ID before we can query the device key self._common.port.ping() - Console.log_info( - f"Dropping {len(frame)} byte packet to query network ID..." - ) + Console.log_info(f"Dropping {len(frame)} byte packet to query network ID...") else: self._common.query_device_key(None) - Console.log_info( - f"Dropping {len(frame)} byte packet to query device key..." - ) + Console.log_info(f"Dropping {len(frame)} byte packet to query device key...") return except cryptography.exceptions.InvalidTag as e: Console.log_error(f"Failed to decode {len(frame)} byte packet {e}") @@ -226,9 +212,7 @@ def _handle_serial_frame(self, frame: bytearray): elif pkt.ptype == InfuseType.KEY_IDS: self._common.query_device_key(None) - notification = ClientNotification( - ClientNotification.Type.EPACKET_RECV, epacket=pkt - ) + notification = ClientNotification(ClientNotification.Type.EPACKET_RECV, epacket=pkt) # Forward to clients self._common.server.broadcast(notification) except (ValueError, KeyError) as e: @@ -277,9 +261,7 @@ def _handle_epacket_send(self, req: GatewayRequest): # Do we have the device public keys we need? for hop in routed.route: - if hop.auth == Auth.DEVICE and not self._common.ddb.has_public_key( - hop.infuse_id - ): + if hop.auth == Auth.DEVICE and not self._common.ddb.has_public_key(hop.infuse_id): cb_event = threading.Event() self._common.query_device_key(cb_event) @@ -291,17 +273,11 @@ def _handle_epacket_send(self, req: GatewayRequest): self._common.port.write(encrypted) def _bt_connect_cb(self, pkt: PacketReceived, rc: int, response: bytes): - resp = defs.bt_connect_infuse.response.from_buffer_copy( - pkt.payload[ctypes.sizeof(rpc.ResponseHeader) :] - ) + resp = defs.bt_connect_infuse.response.from_buffer_copy(pkt.payload[ctypes.sizeof(rpc.ResponseHeader) :]) if_addr = interface.Address.BluetoothLeAddr.from_rpc_struct(resp.peer) infuse_id = self._common.ddb.infuse_id_from_bluetooth(if_addr) - evt = ( - ClientNotification.Type.CONNECTION_FAILED - if rc < 0 - else ClientNotification.Type.CONNECTION_CREATED - ) + evt = ClientNotification.Type.CONNECTION_FAILED if rc < 0 else ClientNotification.Type.CONNECTION_CREATED rsp = ClientNotification( evt, connection_id=infuse_id, @@ -358,9 +334,7 @@ def _handle_conn_release(self, req: GatewayRequest): return disconnect_args = defs.bt_disconnect.request(state.bt_addr.to_rpc_struct()) - cmd = self._common.rpc.generate( - defs.bt_disconnect.COMMAND_ID, bytes(disconnect_args), Auth.DEVICE, None - ) + cmd = self._common.rpc.generate(defs.bt_disconnect.COMMAND_ID, bytes(disconnect_args), Auth.DEVICE, None) encrypted = cmd.to_serial(self._common.ddb) Console.log_tx(cmd.ptype, len(encrypted)) self._common.port.write(encrypted) @@ -420,9 +394,7 @@ def __init__(self, args): else: self.server = LocalServer(default_multicast_address()) self.rpc_server = LocalRpcServer(self.ddb) - self._common = CommonThreadState( - self.server, self.port, self.ddb, self.rpc_server - ) + self._common = CommonThreadState(self.server, self.port, self.ddb, self.rpc_server) self.log = args.log Console.init() diff --git a/src/infuse_iot/tools/localhost.py b/src/infuse_iot/tools/localhost.py index d485e72..c401ef0 100644 --- a/src/infuse_iot/tools/localhost.py +++ b/src/infuse_iot/tools/localhost.py @@ -9,22 +9,23 @@ import pathlib import threading import time + from aiohttp import web from aiohttp.web_request import BaseRequest from aiohttp.web_runner import GracefulExit -from infuse_iot.util.console import Console -from infuse_iot.util.threading import SignaledThread -from infuse_iot.common import InfuseType +import infuse_iot.epacket.interface as interface from infuse_iot.commands import InfuseCommand +from infuse_iot.common import InfuseType from infuse_iot.socket_comms import ( - LocalClient, ClientNotification, + LocalClient, default_multicast_address, ) from infuse_iot.tdf import TDF from infuse_iot.time import InfuseTime -import infuse_iot.epacket.interface as interface +from infuse_iot.util.console import Console +from infuse_iot.util.threading import SignaledThread class SubCommand(InfuseCommand): @@ -125,7 +126,7 @@ def tdf_columns(self, tdf): def column_title(struct, name): postfix = struct._postfix_[name] if postfix != "": - return "{} ({})".format(name, postfix) + return f"{name} ({postfix})" return name for field in tdf.field_information(): @@ -186,9 +187,7 @@ def recv_thread(self): if field.subfield: if field.field not in self._data[source.infuse_id][t.name]: self._data[source.infuse_id][t.name][field.field] = {} - self._data[source.infuse_id][t.name][field.field][ - field.subfield - ] = field.val_fmt() + self._data[source.infuse_id][t.name][field.field][field.subfield] = field.val_fmt() else: self._data[source.infuse_id][t.name][field.field] = field.val_fmt() self._data_lock.release() diff --git a/src/infuse_iot/tools/native_bt.py b/src/infuse_iot/tools/native_bt.py index 9be04ef..ff87934 100644 --- a/src/infuse_iot/tools/native_bt.py +++ b/src/infuse_iot/tools/native_bt.py @@ -11,24 +11,23 @@ from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData -from infuse_iot.util.argparse import BtLeAddress -from infuse_iot.util.console import Console +import infuse_iot.epacket.interface as interface +from infuse_iot.commands import InfuseCommand +from infuse_iot.database import DeviceDatabase from infuse_iot.epacket.packet import ( - CtypeBtAdvFrame, - PacketReceived, - HopReceived, Auth, + CtypeBtAdvFrame, Flags, + HopReceived, + PacketReceived, ) -from infuse_iot.commands import InfuseCommand from infuse_iot.socket_comms import ( - LocalServer, ClientNotification, + LocalServer, default_multicast_address, ) -from infuse_iot.database import DeviceDatabase - -import infuse_iot.epacket.interface as interface +from infuse_iot.util.argparse import BtLeAddress +from infuse_iot.util.console import Console class SubCommand(InfuseCommand): @@ -47,11 +46,7 @@ def __init__(self, args): Console.init() def simple_callback(self, device: BLEDevice, data: AdvertisementData): - addr = interface.Address( - interface.Address.BluetoothLeAddr( - 0, BtLeAddress.integer_value(device.address) - ) - ) + addr = interface.Address(interface.Address.BluetoothLeAddr(0, BtLeAddress.integer_value(device.address))) rssi = data.rssi payload = data.manufacturer_data[self.infuse_manu] @@ -76,9 +71,7 @@ def simple_callback(self, device: BLEDevice, data: AdvertisementData): async def async_run(self): self.server = LocalServer(default_multicast_address()) - scanner = BleakScanner( - self.simple_callback, [self.infuse_service], cb=dict(use_bdaddr=True) - ) + scanner = BleakScanner(self.simple_callback, [self.infuse_service], cb=dict(use_bdaddr=True)) while True: Console.log_info("Starting scanner") diff --git a/src/infuse_iot/tools/provision.py b/src/infuse_iot/tools/provision.py index 6af5cb2..6a1cc6a 100644 --- a/src/infuse_iot/tools/provision.py +++ b/src/infuse_iot/tools/provision.py @@ -6,8 +6,8 @@ __copyright__ = "Copyright 2024, Embeint Inc" import ctypes -from http import HTTPStatus import sys +from http import HTTPStatus try: from simple_term_menu import TerminalMenu @@ -18,15 +18,15 @@ from infuse_iot.api_client import Client from infuse_iot.api_client.api.default import ( - get_device_by_soc_and_mcu_id, create_device, get_all_organisations, - get_boards, get_board_by_id, + get_boards, + get_device_by_soc_and_mcu_id, ) from infuse_iot.api_client.models import NewDevice, NewDeviceMetadata -from infuse_iot.credentials import get_api_key from infuse_iot.commands import InfuseCommand +from infuse_iot.credentials import get_api_key class ProvisioningStruct(ctypes.LittleEndianStructure): @@ -131,9 +131,7 @@ def create_device(self, client, soc, hardware_id_str): options = [f"{o.name:20s} ({o.id})" for o in orgs] if TerminalMenu is None: - sys.exit( - "Specify organisation with --organisation:\n" + "\n".join(options) - ) + sys.exit("Specify organisation with --organisation:\n" + "\n".join(options)) terminal_menu = TerminalMenu(options) idx = terminal_menu.show() @@ -152,9 +150,7 @@ def create_device(self, client, soc, hardware_id_str): board = get_board_by_id.sync(client=client, id=self._board) if board.soc != soc: - sys.exit( - f"Found SoC '{soc}' but board '{board.name}' has SoC '{board.soc}'" - ) + sys.exit(f"Found SoC '{soc}' but board '{board.name}' has SoC '{board.soc}'") new_board = NewDevice( mcu_id=hardware_id_str, @@ -167,9 +163,7 @@ def create_device(self, client, soc, hardware_id_str): response = create_device.sync_detailed(client=client, body=new_board) if response.status_code != HTTPStatus.CREATED: - sys.exit( - f"Failed to create device:\n\t<{response.status_code}> {response.content.decode('utf-8')}" - ) + sys.exit(f"Failed to create device:\n\t<{response.status_code}> {response.content.decode('utf-8')}") def run(self): with LowLevel.API() as api: @@ -187,9 +181,7 @@ def run(self): # Get existing device or create new device with client as client: - response = get_device_by_soc_and_mcu_id.sync_detailed( - client=client, soc=soc, mcu_id=hardware_id_str - ) + response = get_device_by_soc_and_mcu_id.sync_detailed(client=client, soc=soc, mcu_id=hardware_id_str) if response.status_code == HTTPStatus.OK: # Device found, fall through pass @@ -205,9 +197,7 @@ def run(self): f"Failed to query device after creation:\n\t<{response.status_code}> {response.content.decode('utf-8')}" ) print("To provision more devices like this:") - print( - f"\t infuse provision --organisation {self._org} --board {self._board}" - ) + print(f"\t infuse provision --organisation {self._org} --board {self._board}") else: sys.exit( f"Failed to query device information:\n\t<{response.status_code}> {response.content.decode('utf-8')}" @@ -215,27 +205,19 @@ def run(self): # Compare current flash contents to desired flash contents cloud_id = int(response.parsed.device_id, 16) - current_bytes = bytes( - api.read(uicr_addr, ctypes.sizeof(ProvisioningStruct)) - ) + current_bytes = bytes(api.read(uicr_addr, ctypes.sizeof(ProvisioningStruct))) desired = ProvisioningStruct(cloud_id) desired_bytes = bytes(desired) if current_bytes == desired_bytes: - print( - f"HW ID 0x{hardware_id:016x} already provisioned as 0x{desired.device_id:016x}" - ) + print(f"HW ID 0x{hardware_id:016x} already provisioned as 0x{desired.device_id:016x}") else: if current_bytes != len(current_bytes) * b"\xff": - print( - f"HW ID 0x{hardware_id:016x} already has incorrect provisioning info, recover device" - ) + print(f"HW ID 0x{hardware_id:016x} already has incorrect provisioning info, recover device") return api.write(uicr_addr, bytes(desired), True) - print( - f"HW ID 0x{hardware_id:016x} now provisioned as 0x{desired.device_id:016x}" - ) + print(f"HW ID 0x{hardware_id:016x} now provisioned as 0x{desired.device_id:016x}") # Reset the MCU api.pin_reset() diff --git a/src/infuse_iot/tools/rpc.py b/src/infuse_iot/tools/rpc.py index 0163a3c..448fc52 100644 --- a/src/infuse_iot/tools/rpc.py +++ b/src/infuse_iot/tools/rpc.py @@ -6,23 +6,22 @@ __copyright__ = "Copyright 2024, Embeint Inc" import argparse -import random import ctypes import importlib import pkgutil +import random -from infuse_iot.common import InfuseType, InfuseID -from infuse_iot.epacket.packet import PacketOutput +import infuse_iot.rpc_wrappers as wrappers +from infuse_iot import rpc from infuse_iot.commands import InfuseCommand, InfuseRpcCommand +from infuse_iot.common import InfuseID, InfuseType +from infuse_iot.epacket.packet import PacketOutput from infuse_iot.socket_comms import ( - LocalClient, ClientNotification, GatewayRequest, + LocalClient, default_multicast_address, ) -from infuse_iot import rpc - -import infuse_iot.rpc_wrappers as wrappers class SubCommand(InfuseCommand): @@ -33,15 +32,9 @@ class SubCommand(InfuseCommand): @classmethod def add_parser(cls, parser): addr_group = parser.add_mutually_exclusive_group(required=True) - addr_group.add_argument( - "--gateway", action="store_true", help="Run command on local gateway" - ) - addr_group.add_argument( - "--id", type=lambda x: int(x, 0), help="Infuse ID to run command on" - ) - command_list_parser = parser.add_subparsers( - title="commands", metavar="", required=True - ) + addr_group.add_argument("--gateway", action="store_true", help="Run command on local gateway") + addr_group.add_argument("--id", type=lambda x: int(x, 0), help="Infuse ID to run command on") + command_list_parser = parser.add_subparsers(title="commands", metavar="", required=True) for _, name, _ in pkgutil.walk_packages(wrappers.__path__): full_name = f"{wrappers.__name__}.{name}" @@ -93,9 +86,7 @@ def _wait_rpc_rsp(self): if rsp_header.request_id != self._request_id: continue # Convert response bytes back to struct form - rsp_data = self._command.response.from_buffer_copy( - rsp.epacket.payload[ctypes.sizeof(rpc.ResponseHeader) :] - ) + rsp_data = self._command.response.from_buffer_copy(rsp.epacket.payload[ctypes.sizeof(rpc.ResponseHeader) :]) # Handle the response print(f"INFUSE ID: {rsp.epacket.route[0].infuse_id:016x}") self._command.handle_response(rsp_header.return_code, rsp_data) diff --git a/src/infuse_iot/tools/serial_throughput.py b/src/infuse_iot/tools/serial_throughput.py index 1ced257..ac72ea8 100644 --- a/src/infuse_iot/tools/serial_throughput.py +++ b/src/infuse_iot/tools/serial_throughput.py @@ -8,13 +8,13 @@ import random import time -from infuse_iot.common import InfuseType, InfuseID -from infuse_iot.epacket.packet import PacketOutput, Auth from infuse_iot.commands import InfuseCommand +from infuse_iot.common import InfuseID, InfuseType +from infuse_iot.epacket.packet import Auth, PacketOutput from infuse_iot.socket_comms import ( - LocalClient, ClientNotification, GatewayRequest, + LocalClient, default_multicast_address, ) diff --git a/src/infuse_iot/tools/tdf_csv.py b/src/infuse_iot/tools/tdf_csv.py index d8181f4..0d856ed 100644 --- a/src/infuse_iot/tools/tdf_csv.py +++ b/src/infuse_iot/tools/tdf_csv.py @@ -8,11 +8,11 @@ import os import time -from infuse_iot.common import InfuseType from infuse_iot.commands import InfuseCommand +from infuse_iot.common import InfuseType from infuse_iot.socket_comms import ( - LocalClient, ClientNotification, + LocalClient, default_multicast_address, ) from infuse_iot.tdf import TDF @@ -26,9 +26,7 @@ class SubCommand(InfuseCommand): @classmethod def add_parser(cls, parser): - parser.add_argument( - "--unix", action="store_true", help="Save timestamps as unix" - ) + parser.add_argument("--unix", action="store_true", help="Save timestamps as unix") def __init__(self, args): self._client = LocalClient(default_multicast_address(), 1.0) @@ -63,11 +61,7 @@ def run(self): time_str = time_func(time.time()) else: time_str = time_func(reading_time) - line = ( - time_str - + "," - + ",".join([f.val_fmt() for f in reading.iter_fields()]) - ) + line = time_str + "," + ",".join([f.val_fmt() for f in reading.iter_fields()]) lines.append(line) if tdf.period is not None: reading_time += tdf.period @@ -82,9 +76,7 @@ def run(self): else: print(f"Opening new {filename}") files[filename] = open(filename, "w", encoding="utf-8") - headings = "time," + ",".join( - [f.name for f in first.iter_fields()] - ) + headings = "time," + ",".join([f.name for f in first.iter_fields()]) files[filename].write(headings + os.linesep) # Write line to file then flush diff --git a/src/infuse_iot/tools/tdf_list.py b/src/infuse_iot/tools/tdf_list.py index 28f9b1c..3144bdc 100644 --- a/src/infuse_iot/tools/tdf_list.py +++ b/src/infuse_iot/tools/tdf_list.py @@ -7,11 +7,11 @@ import tabulate -from infuse_iot.common import InfuseType from infuse_iot.commands import InfuseCommand +from infuse_iot.common import InfuseType from infuse_iot.socket_comms import ( - LocalClient, ClientNotification, + LocalClient, default_multicast_address, ) from infuse_iot.tdf import TDF @@ -58,13 +58,9 @@ def run(self): t = InfuseTime.utc_time_string(tdf.time + offset) else: t = "N/A" - table.append( - [t, tdf_name, field.name, field.val_fmt(), field.postfix] - ) + table.append([t, tdf_name, field.name, field.val_fmt(), field.postfix]) else: - table.append( - [None, None, field.name, field.val_fmt(), field.postfix] - ) + table.append([None, None, field.name, field.val_fmt(), field.postfix]) print(f"Infuse ID: {source.infuse_id:016x}") print(f"Interface: {source.interface.name}") diff --git a/src/infuse_iot/util/argparse.py b/src/infuse_iot/util/argparse.py index b4a82dc..f109699 100644 --- a/src/infuse_iot/util/argparse.py +++ b/src/infuse_iot/util/argparse.py @@ -3,7 +3,6 @@ import argparse import pathlib import re - from typing import cast diff --git a/src/infuse_iot/util/console.py b/src/infuse_iot/util/console.py index 29f831e..1ee953b 100644 --- a/src/infuse_iot/util/console.py +++ b/src/infuse_iot/util/console.py @@ -2,6 +2,7 @@ import datetime + import colorama diff --git a/src/infuse_iot/util/crypto.py b/src/infuse_iot/util/crypto.py index 881df01..07d03a5 100644 --- a/src/infuse_iot/util/crypto.py +++ b/src/infuse_iot/util/crypto.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 +from cryptography.hazmat.primitives.kdf.hkdf import HKDF def hkdf_derive(input_key, salt, info): @@ -16,17 +16,13 @@ def hkdf_derive(input_key, salt, info): return hkdf.derive(input_key) -def chachapoly_encrypt( - key: bytes, associated_data: bytes, nonce: bytes, payload: bytes -) -> bytes: +def chachapoly_encrypt(key: bytes, associated_data: bytes, nonce: bytes, payload: bytes) -> bytes: """Encrypt a payload using ChaCha20-Poly1305""" cipher = ChaCha20Poly1305(key) return cipher.encrypt(nonce, payload, associated_data) -def chachapoly_decrypt( - key: bytes, associated_data: bytes, nonce: bytes, payload: bytes -) -> bytes: +def chachapoly_decrypt(key: bytes, associated_data: bytes, nonce: bytes, payload: bytes) -> bytes: """Decrypt a payload using ChaCha20-Poly1305""" cipher = ChaCha20Poly1305(key) return cipher.decrypt(nonce, payload, associated_data) diff --git a/tests/test_main.py b/tests/test_main.py index 5e01fb6..8c987f3 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -14,9 +14,7 @@ def test_main(): # sane (i.e. the actual version number is printed instead of # simply an error message to stderr). - output_as_module = subprocess.check_output( - [sys.executable, "-m", "infuse_iot", "--version"] - ).decode() + output_as_module = subprocess.check_output([sys.executable, "-m", "infuse_iot", "--version"]).decode() output_directly = subprocess.check_output(["infuse", "--version"]).decode() assert infuse_iot.version.__version__ in output_as_module assert output_as_module == output_directly From 1754d7093240b9d1bc4defe7fa733324c5dee330 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Wed, 27 Nov 2024 15:03:29 +1000 Subject: [PATCH 5/6] treewide: manual error correction Fix the ruff errors that it can't do automatically. Signed-off-by: Jordan Yates --- src/infuse_iot/database.py | 5 +- src/infuse_iot/diff.py | 82 ++++++++++++----------- src/infuse_iot/generated/tdf_base.py | 3 +- src/infuse_iot/serial_comms.py | 2 +- src/infuse_iot/tools/provision.py | 12 ++-- src/infuse_iot/tools/serial_throughput.py | 6 +- src/infuse_iot/tools/tdf_csv.py | 4 +- src/infuse_iot/util/argparse.py | 2 +- 8 files changed, 59 insertions(+), 57 deletions(-) diff --git a/src/infuse_iot/database.py b/src/infuse_iot/database.py index 19302de..d2d52c4 100644 --- a/src/infuse_iot/database.py +++ b/src/infuse_iot/database.py @@ -36,7 +36,8 @@ class DeviceDatabase: """Database of current device state""" _network_keys = { - 0x000000: b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f", + 0x000000: b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f" + b"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f", } _derived_keys: Dict[Tuple[int, bytes, int], bytes] = {} @@ -114,7 +115,7 @@ def _network_key(self, network_id: int, interface: bytes, gps_time: int) -> byte try: info = load_network(network_id) except FileNotFoundError: - raise UnknownNetworkError + raise UnknownNetworkError from None self._network_keys[network_id] = info["key"] base = self._network_keys[network_id] time_idx = gps_time // (60 * 60 * 24) diff --git a/src/infuse_iot/diff.py b/src/infuse_iot/diff.py index 354b188..9a147b9 100644 --- a/src/infuse_iot/diff.py +++ b/src/infuse_iot/diff.py @@ -601,16 +601,16 @@ def _cleanup_jumps(cls, old: bytes, instructions: List[Instr]) -> List[Instr]: len(instructions) >= 3 and isinstance(instructions[1], WriteInstr) and isinstance(instructions[2], SetAddrInstr) - ): # ADDR, COPY, WRITE, ADRR - if instr.shift == -instructions[2].shift: - write = instructions[1] - # Replace with a merged write instead - merged.append(WriteInstr(old[instr.new : instr.new + copy.length] + write.data)) - replaced = True - instructions.pop(0) - instructions.pop(0) - instructions.pop(0) + and instr.shift == -instructions[2].shift + ): + write = instructions[1] + # Replace with a merged write instead + merged.append(WriteInstr(old[instr.new : instr.new + copy.length] + write.data)) + replaced = True + instructions.pop(0) + instructions.pop(0) + instructions.pop(0) if not replaced: merged.append(instr) @@ -642,17 +642,11 @@ def finalise(): to_merge = [] for instr in instructions: - pended = False - - if isinstance(instr, CopyInstr): - if instr.length < 128: - to_merge.append(instr) - pended = True - elif isinstance(instr, WriteInstr): - if len(to_merge) > 0 and len(instr.data) < 256: - to_merge.append(instr) - pended = True - if not pended: + if (isinstance(instr, CopyInstr) and instr.length < 128) or ( + isinstance(instr, WriteInstr) and len(to_merge) > 0 and len(instr.data) < 256 + ): + to_merge.append(instr) + else: finalise() merged.append(instr) @@ -828,9 +822,11 @@ def _patch_load(cls, patch_binary: bytes): raise ValidationError( f"Patch data length does not match header information ({len(data)} != {hdr.patch_file.length})" ) - if binascii.crc32(data) != hdr.patch_file.crc: + crc_patch = binascii.crc32(data) + crc_expected = hdr.patch_file.crc + if crc_patch != hdr.patch_file.crc: raise ValidationError( - f"Patch data CRC does not match patch information ({binascii.crc32(data):08x} != {hdr.patch_file.crc:08x})" + f"Patch data CRC does not match patch information ({crc_patch:08x} != {crc_expected:08x})" ) instructions = [] @@ -933,13 +929,17 @@ def patch( patched = b"" orig_offset = 0 - if len(bin_original) != meta["original"]["len"]: + len_orig = len(bin_original) + len_expected = meta["original"]["len"] + if len_orig != len_expected: raise ValidationError( - f"Original file length does not match patch information ({len(bin_original)} != {meta['original']['len']})" + f"Original file length does not match patch information ({len_orig} != {len_expected})" ) - if binascii.crc32(bin_original) != meta["original"]["crc"]: + crc_orig = binascii.crc32(bin_original) + crc_expected = meta["original"]["crc"] + if crc_orig != crc_expected: raise ValidationError( - f"Original file CRC does not match patch information ({binascii.crc32(bin_original):08x} != {meta['original']['crc']:08x})" + f"Original file CRC does not match patch information ({crc_orig:08x} != {crc_expected:08x})" ) for instr in instructions: @@ -965,13 +965,17 @@ def patch( assert 0 # Validate generated file matches what was expected - if len(patched) != meta["new"]["len"]: + len_patched = len(patched) + len_expected = meta["new"]["len"] + if len_patched != len_expected: raise ValidationError( - f"Original file length does not match patch information ({len(patched)} != {meta['new']['len']})" + f"Original file length does not match patch information ({len_patched} != {len_expected})" ) - if binascii.crc32(patched) != meta["new"]["crc"]: + crc_patched = binascii.crc32(patched) + crc_expected = meta["new"]["crc"] + if crc_patched != crc_expected: raise ValidationError( - f"Original file CRC does not match patch information ({binascii.crc32(patched):08x} != {meta['new']['crc']:08x})" + f"Original file CRC does not match patch information ({crc_patched:08x} != {crc_expected:08x})" ) return patched @@ -1046,13 +1050,12 @@ def dump( # Run requested command if args.command == "generate": - with open(args.original, "rb") as f_orig: - with open(args.new, "rb") as f_new: - patch = diff.generate( - f_orig.read(-1), - f_new.read(-1), - args.verbose, - ) + with open(args.original, "rb") as f_orig, open(args.new, "rb") as f_new: + patch = diff.generate( + f_orig.read(-1), + f_new.read(-1), + args.verbose, + ) with open(args.patch, "wb") as f_output: f_output.write(patch) elif args.command == "validation": @@ -1061,9 +1064,8 @@ def dump( with open(args.patch, "wb") as f_output: f_output.write(patch) elif args.command == "patch": - with open(args.original, "rb") as f_orig: - with open(args.patch, "rb") as f_patch: - output = diff.patch(f_orig.read(-1), f_patch.read(-1)) + with open(args.original, "rb") as f_orig, open(args.patch, "rb") as f_patch: + output = diff.patch(f_orig.read(-1), f_patch.read(-1)) with open(args.output, "wb") as f_output: f_output.write(output) elif args.command == "dump": diff --git a/src/infuse_iot/generated/tdf_base.py b/src/infuse_iot/generated/tdf_base.py index c52eddb..cb3c4bc 100644 --- a/src/infuse_iot/generated/tdf_base.py +++ b/src/infuse_iot/generated/tdf_base.py @@ -55,8 +55,7 @@ def iter_fields( f_name = _public_name(field) val = getattr(self, f_name) if isinstance(val, ctypes.LittleEndianStructure): - for subfield in val.iter_fields(f_name): - yield subfield + yield from val.iter_fields(f_name) else: if isinstance(val, ctypes.Array): val = list(val) diff --git a/src/infuse_iot/serial_comms.py b/src/infuse_iot/serial_comms.py index 3d8312a..2605fc9 100644 --- a/src/infuse_iot/serial_comms.py +++ b/src/infuse_iot/serial_comms.py @@ -104,7 +104,7 @@ def open(self): if desc.name == "modem_trace": f = f"{int(time.time())}_nrf_modem_trace.bin" print(f"Found nRF LTE modem trace channel (opening {f:s})") - self._modem_trace = open(f, mode="wb") + self._modem_trace = open(f, mode="wb") # noqa: SIM115 self._modem_trace_buf = desc.BufferIndex def read_bytes(self, num): diff --git a/src/infuse_iot/tools/provision.py b/src/infuse_iot/tools/provision.py index 6a1cc6a..b4b7f65 100644 --- a/src/infuse_iot/tools/provision.py +++ b/src/infuse_iot/tools/provision.py @@ -193,15 +193,15 @@ def run(self): client=client, soc=soc, mcu_id=hardware_id_str ) if response.status_code != HTTPStatus.OK: - sys.exit( - f"Failed to query device after creation:\n\t<{response.status_code}> {response.content.decode('utf-8')}" - ) + err = "Failed to query device after creation:\n" + err += f"\t<{response.status_code}> {response.content.decode('utf-8')}" + sys.exit(err) print("To provision more devices like this:") print(f"\t infuse provision --organisation {self._org} --board {self._board}") else: - sys.exit( - f"Failed to query device information:\n\t<{response.status_code}> {response.content.decode('utf-8')}" - ) + err = "Failed to query device information:\n" + err += f"\t<{response.status_code}> {response.content.decode('utf-8')}" + sys.exit(err) # Compare current flash contents to desired flash contents cloud_id = int(response.parsed.device_id, 16) diff --git a/src/infuse_iot/tools/serial_throughput.py b/src/infuse_iot/tools/serial_throughput.py index ac72ea8..9ae6112 100644 --- a/src/infuse_iot/tools/serial_throughput.py +++ b/src/infuse_iot/tools/serial_throughput.py @@ -76,9 +76,9 @@ def run_send_test(self, num, size, queue_size): throughput = total / duration if responses != num: print(f"\tOnly received {responses}/{num} responses") - print( - f"\t{num} packets with {size:3d} bytes payload complete in {duration:.2f} seconds ({int(8*throughput):6d} bps)" - ) + msg = f"\t{num} packets with {size:3d} bytes payload complete in {duration:.2f} seconds" + msg += f" ({int(8*throughput):6d} bps)" + print(msg) def run(self): # No queuing diff --git a/src/infuse_iot/tools/tdf_csv.py b/src/infuse_iot/tools/tdf_csv.py index 0d856ed..e4afdc1 100644 --- a/src/infuse_iot/tools/tdf_csv.py +++ b/src/infuse_iot/tools/tdf_csv.py @@ -72,10 +72,10 @@ def run(self): if filename not in files: if os.path.exists(filename): print(f"Appending to existing {filename}") - files[filename] = open(filename, "a", encoding="utf-8") + files[filename] = open(filename, "a", encoding="utf-8") # noqa: SIM115 else: print(f"Opening new {filename}") - files[filename] = open(filename, "w", encoding="utf-8") + files[filename] = open(filename, "w", encoding="utf-8") # noqa: SIM115 headings = "time," + ",".join([f.name for f in first.iter_fields()]) files[filename].write(headings + os.linesep) diff --git a/src/infuse_iot/util/argparse.py b/src/infuse_iot/util/argparse.py index f109699..b5d1b31 100644 --- a/src/infuse_iot/util/argparse.py +++ b/src/infuse_iot/util/argparse.py @@ -30,7 +30,7 @@ def __new__(cls, string) -> int: # type: ignore try: addr = int(string, 16) except ValueError: - raise argparse.ArgumentTypeError(f"{string} is not a Bluetooth address") + raise argparse.ArgumentTypeError(f"{string} is not a Bluetooth address") from None return addr @classmethod From 8720f3d690c3815829249181d728b1c6a46e3458 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Wed, 27 Nov 2024 15:10:54 +1000 Subject: [PATCH 6/6] treewide: typing updates Use the builtin type definitions instead of the `typing` module. Signed-off-by: Jordan Yates --- src/infuse_iot/commands.py | 5 +-- src/infuse_iot/database.py | 7 ++-- src/infuse_iot/diff.py | 41 ++++++++++----------- src/infuse_iot/epacket/common.py | 5 +-- src/infuse_iot/epacket/interface.py | 13 +++---- src/infuse_iot/epacket/packet.py | 32 ++++++++-------- src/infuse_iot/generated/tdf_base.py | 3 +- src/infuse_iot/generated/tdf_definitions.py | 3 +- src/infuse_iot/socket_comms.py | 14 +++---- src/infuse_iot/tdf.py | 6 +-- src/infuse_iot/tools/gateway.py | 4 +- 11 files changed, 64 insertions(+), 69 deletions(-) diff --git a/src/infuse_iot/commands.py b/src/infuse_iot/commands.py index 4be08b1..f5d943b 100644 --- a/src/infuse_iot/commands.py +++ b/src/infuse_iot/commands.py @@ -7,7 +7,6 @@ import argparse import ctypes -from typing import List, Tuple, Type from infuse_iot.epacket.packet import Auth @@ -61,9 +60,9 @@ def handle_response(self, return_code, response): raise NotImplementedError class VariableSizeResponse: - base_fields: List[Tuple[str, Type[ctypes._SimpleCData]]] = [] + base_fields: list[tuple[str, type[ctypes._SimpleCData]]] = [] var_name = "x" - var_type: Type[ctypes._SimpleCData] = ctypes.c_ubyte + var_type: type[ctypes._SimpleCData] = ctypes.c_ubyte @classmethod def from_buffer_copy(cls, source, offset=0): diff --git a/src/infuse_iot/database.py b/src/infuse_iot/database.py index d2d52c4..745a06f 100644 --- a/src/infuse_iot/database.py +++ b/src/infuse_iot/database.py @@ -2,7 +2,6 @@ import base64 import binascii -from typing import Dict, Tuple from infuse_iot.api_client import Client from infuse_iot.api_client.api.default import get_shared_secret @@ -39,7 +38,7 @@ class DeviceDatabase: 0x000000: b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f" b"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f", } - _derived_keys: Dict[Tuple[int, bytes, int], bytes] = {} + _derived_keys: dict[tuple[int, bytes, int], bytes] = {} class DeviceState: """Device State""" @@ -65,8 +64,8 @@ def gatt_sequence_num(self): def __init__(self) -> None: self.gateway: int | None = None - self.devices: Dict[int, DeviceDatabase.DeviceState] = {} - self.bt_addr: Dict[InterfaceAddress.BluetoothLeAddr, int] = {} + self.devices: dict[int, DeviceDatabase.DeviceState] = {} + self.bt_addr: dict[InterfaceAddress.BluetoothLeAddr, int] = {} def observe_device( self, diff --git a/src/infuse_iot/diff.py b/src/infuse_iot/diff.py index 9a147b9..f299703 100644 --- a/src/infuse_iot/diff.py +++ b/src/infuse_iot/diff.py @@ -5,7 +5,6 @@ import ctypes import enum from collections import defaultdict -from typing import Dict, List, Tuple, Type from typing_extensions import Self @@ -128,7 +127,7 @@ def ctypes_class(self): return self.SetAddrU32 @classmethod - def from_bytes(cls, b: bytes, offset: int, original_offset: int) -> Tuple[Self, int, int]: + def from_bytes(cls, b: bytes, offset: int, original_offset: int) -> tuple[Self, int, int]: opcode = b[offset] if opcode == OpCode.ADDR_SHIFT_S8: s8 = cls.ShiftAddrS8.from_buffer_copy(b, offset) @@ -229,7 +228,7 @@ def ctypes_class(self): return self.CopyU32 @classmethod - def _from_opcode(cls, op: OpCode) -> Type[CopyGeneric]: + def _from_opcode(cls, op: OpCode) -> type[CopyGeneric]: if op == OpCode.COPY_LEN_U4: return cls.CopyU4 elif op == OpCode.COPY_LEN_U12: @@ -242,7 +241,7 @@ def _from_opcode(cls, op: OpCode) -> Type[CopyGeneric]: raise RuntimeError @classmethod - def from_bytes(cls, b: bytes, offset: int, original_offset: int) -> Tuple[Self, int, int]: + def from_bytes(cls, b: bytes, offset: int, original_offset: int) -> tuple[Self, int, int]: opcode = OpCode.from_byte(b[offset]) op_class = cls._from_opcode(opcode) s = op_class.from_buffer_copy(b, offset) @@ -332,7 +331,7 @@ def ctypes_class(self): return self.WriteU32 @classmethod - def _from_opcode(cls, op: OpCode) -> Type[WriteGeneric]: + def _from_opcode(cls, op: OpCode) -> type[WriteGeneric]: if op == OpCode.WRITE_LEN_U4: return cls.WriteU4 elif op == OpCode.WRITE_LEN_U12: @@ -345,7 +344,7 @@ def _from_opcode(cls, op: OpCode) -> Type[WriteGeneric]: raise RuntimeError @classmethod - def from_bytes(cls, b: bytes, offset: int, original_offset: int) -> Tuple[Self, int, int]: + def from_bytes(cls, b: bytes, offset: int, original_offset: int) -> tuple[Self, int, int]: opcode = OpCode.from_byte(b[offset]) op_class = cls._from_opcode(opcode) s = op_class.from_buffer_copy(b, offset) @@ -397,7 +396,7 @@ def ctypes_class(self): @classmethod def from_bytes(cls, b: bytes, offset: int, original_offset: int): assert b[offset] == OpCode.PATCH - operations: List[Instr] = [] + operations: list[Instr] = [] length = 1 while True: @@ -498,7 +497,7 @@ class ArrayValidation(ctypes.LittleEndianStructure): @classmethod def _naive_diff(cls, old: bytes, new: bytes, hash_len: int = 8): """Construct basic runs of WRITE, COPY, and SET_ADDR instructions""" - instr: List[Instr] = [] + instr: list[Instr] = [] old_offset = 0 new_offset = 0 write_start = 0 @@ -577,10 +576,10 @@ def _naive_diff(cls, old: bytes, new: bytes, hash_len: int = 8): return instr @classmethod - def _cleanup_jumps(cls, old: bytes, instructions: List[Instr]) -> List[Instr]: + def _cleanup_jumps(cls, old: bytes, instructions: list[Instr]) -> list[Instr]: """Find locations that jumped backwards just to jump forward to original location""" - merged: List[Instr] = [] + merged: list[Instr] = [] while len(instructions) > 0: instr = instructions.pop(0) replaced = False @@ -625,10 +624,10 @@ def _cleanup_jumps(cls, old: bytes, instructions: List[Instr]) -> List[Instr]: return cleaned @classmethod - def _merge_operations(cls, instructions: List[Instr]) -> List[Instr]: + def _merge_operations(cls, instructions: list[Instr]) -> list[Instr]: """Merge runs of COPY and WRITE into PATCH""" - merged: List[Instr] = [] - to_merge: List[Instr] = [] + merged: list[Instr] = [] + to_merge: list[Instr] = [] def finalise(): nonlocal merged @@ -655,10 +654,10 @@ def finalise(): return merged @classmethod - def _write_crack(cls, old: bytes, instructions: List[Instr]) -> List[Instr]: + def _write_crack(cls, old: bytes, instructions: list[Instr]) -> list[Instr]: """Crack a WRITE operation into a [WRITE,COPY,WRITE] if COPY is at least 2 bytes""" - cracked: List[Instr] = [] + cracked: list[Instr] = [] old_offset = 0 while len(instructions): @@ -735,7 +734,7 @@ def _write_crack(cls, old: bytes, instructions: List[Instr]) -> List[Instr]: return cracked @classmethod - def _gen_patch_instr(cls, bin_orig: bytes, bin_new: bytes) -> Tuple[Dict, List[Instr]]: + def _gen_patch_instr(cls, bin_orig: bytes, bin_new: bytes) -> tuple[dict, list[Instr]]: best_patch = [] best_patch_len = 2**32 @@ -765,7 +764,7 @@ def _gen_patch_instr(cls, bin_orig: bytes, bin_new: bytes) -> Tuple[Dict, List[I return metadata, best_patch @classmethod - def _gen_patch_header(cls, patch_metadata: Dict, patch_data: bytes): + def _gen_patch_header(cls, patch_metadata: dict, patch_data: bytes): hdr = cls.PatchHeader( cls.PatchHeader.magic_value, cls.PatchHeader.VERSION_MAJOR, @@ -789,7 +788,7 @@ def _gen_patch_header(cls, patch_metadata: Dict, patch_data: bytes): return bytes(hdr) @classmethod - def _gen_patch_data(cls, instructions: List[Instr]): + def _gen_patch_data(cls, instructions: list[Instr]): output_bytes = b"" for instr in instructions: output_bytes += bytes(instr) @@ -857,7 +856,7 @@ def generate( print(f" Patch File: {len(bin_patch):6d} bytes ({ratio:.2f}%) ({len(instructions):5d} instructions)") if verbose: - class_count: Dict[OpCode, int] = defaultdict(int) + class_count: dict[OpCode, int] = defaultdict(int) for instr in instructions: class_count[instr.ctypes_class().op] += 1 @@ -878,7 +877,7 @@ def validation(cls, bin_original: bytes, invalid_length: bool, invalid_crc: bool assert len(bin_original) > 1024 # Manually construct an instruction set that runs all instructions - instructions: List[Instr] = [] + instructions: list[Instr] = [] instructions.append(WriteInstr(bin_original[:8], cls_override=WriteInstr.WriteU4)) instructions.append(WriteInstr(bin_original[8:16], cls_override=WriteInstr.WriteU12)) instructions.append(SetAddrInstr(16, 8, cls_override=SetAddrInstr.ShiftAddrS8)) @@ -992,7 +991,7 @@ def dump( print(f" New File: {meta['new']['len']:6d} bytes") print(f" Patch File: {len(bin_patch)} bytes ({len(instructions):5d} instructions)") - class_count: Dict[OpCode, int] = defaultdict(int) + class_count: dict[OpCode, int] = defaultdict(int) for instr in instructions: class_count[instr.ctypes_class().op] += 1 if isinstance(instr, WriteInstr): diff --git a/src/infuse_iot/epacket/common.py b/src/infuse_iot/epacket/common.py index b6499fd..bcdd598 100644 --- a/src/infuse_iot/epacket/common.py +++ b/src/infuse_iot/epacket/common.py @@ -1,16 +1,15 @@ #!/usr/bin/env python3 -from typing import Dict from typing_extensions import Self class Serializable: - def to_json(self) -> Dict: + def to_json(self) -> dict: """Convert class to json dictionary""" raise NotImplementedError @classmethod - def from_json(cls, values: Dict) -> Self: + def from_json(cls, values: dict) -> Self: """Reconstruct class from json dictionary""" raise NotImplementedError diff --git a/src/infuse_iot/epacket/interface.py b/src/infuse_iot/epacket/interface.py index feb26c6..8c6fc63 100644 --- a/src/infuse_iot/epacket/interface.py +++ b/src/infuse_iot/epacket/interface.py @@ -2,7 +2,6 @@ import ctypes import enum -from typing import Dict from typing_extensions import Self @@ -29,11 +28,11 @@ def __str__(self): def len(self): return 0 - def to_json(self) -> Dict: + def to_json(self) -> dict: return {"i": "SERIAL"} @classmethod - def from_json(cls, values: Dict) -> Self: + def from_json(cls, values: dict) -> Self: return cls() class BluetoothLeAddr(Serializable): @@ -66,11 +65,11 @@ def to_ctype(self) -> CtypesFormat: """Convert the address to the ctype format""" return self.CtypesFormat(self.addr_type, bytes_to_uint8(self.addr_val.to_bytes(6, "little"))) - def to_json(self) -> Dict: + def to_json(self) -> dict: return {"i": "BT", "t": self.addr_type, "v": self.addr_val} @classmethod - def from_json(cls, values: Dict) -> Self: + def from_json(cls, values: dict) -> Self: return cls(values["t"], values["v"]) def to_rpc_struct(self) -> rpc_defs.rpc_struct_bt_addr_le: @@ -93,11 +92,11 @@ def __str__(self): def len(self): return self.val.len() - def to_json(self) -> Dict: + def to_json(self) -> dict: return self.val.to_json() @classmethod - def from_json(cls, values: Dict) -> Self: + def from_json(cls, values: dict) -> Self: if values["i"] == "BT": return cls(cls.BluetoothLeAddr.from_json(values)) elif values["i"] == "SERIAL": diff --git a/src/infuse_iot/epacket/packet.py b/src/infuse_iot/epacket/packet.py index f15e98a..e6c8993 100644 --- a/src/infuse_iot/epacket/packet.py +++ b/src/infuse_iot/epacket/packet.py @@ -5,7 +5,7 @@ import enum import random import time -from typing import Any, Dict, List, Tuple +from typing import Any from typing_extensions import Self @@ -36,7 +36,7 @@ def __init__(self, infuse_id: int, interface: Interface, auth: Auth): self.interface = interface self.auth = auth - def to_json(self) -> Dict: + def to_json(self) -> dict: return { "id": self.infuse_id, "interface": self.interface.value, @@ -44,7 +44,7 @@ def to_json(self) -> Dict: } @classmethod - def from_json(cls, values: Dict) -> Self: + def from_json(cls, values: dict) -> Self: return cls( infuse_id=values["id"], interface=Interface(values["interface"]), @@ -78,7 +78,7 @@ def __init__( self.sequence = sequence self.rssi = rssi - def to_json(self) -> Dict: + def to_json(self) -> dict: return { "id": self.infuse_id, "interface": self.interface.value, @@ -91,7 +91,7 @@ def to_json(self) -> Dict: } @classmethod - def from_json(cls, values: Dict) -> Self: + def from_json(cls, values: dict) -> Self: interface = Interface(values["interface"]) return cls( infuse_id=values["id"], @@ -108,13 +108,13 @@ def from_json(cls, values: Dict) -> Self: class PacketReceived(Serializable): """ePacket received by a gateway""" - def __init__(self, route: List[HopReceived], ptype: InfuseType, payload: bytes): + def __init__(self, route: list[HopReceived], ptype: InfuseType, payload: bytes): # [Original Transmission, hop, hop, serial] self.route = route self.ptype = ptype self.payload = payload - def to_json(self) -> Dict: + def to_json(self) -> dict: """Convert class to json dictionary""" return { "route": [x.to_json() for x in self.route], @@ -123,7 +123,7 @@ def to_json(self) -> Dict: } @classmethod - def from_json(cls, values: Dict) -> Self: + def from_json(cls, values: dict) -> Self: """Reconstruct class from json dictionary""" return cls( route=[HopReceived.from_json(x) for x in values["route"]], @@ -132,7 +132,7 @@ def from_json(cls, values: Dict) -> Self: ) @classmethod - def from_serial(cls, database: DeviceDatabase, serial_frame: bytes) -> List[Self]: + def from_serial(cls, database: DeviceDatabase, serial_frame: bytes) -> list[Self]: header, decrypted = CtypeSerialFrame.decrypt(database, serial_frame) # Packet from local gateway @@ -149,7 +149,7 @@ def from_serial(cls, database: DeviceDatabase, serial_frame: bytes) -> List[Self del packet_bytes[: ctypes.sizeof(common_header)] # Only Bluetooth advertising supported for now - decode_mapping: Dict[Interface, Any] = { + decode_mapping: dict[Interface, Any] = { Interface.BT_ADV: CtypeBtAdvFrame, Interface.BT_PERIPHERAL: CtypeBtGattFrame, Interface.BT_CENTRAL: CtypeBtGattFrame, @@ -215,7 +215,7 @@ def from_serial(cls, database: DeviceDatabase, serial_frame: bytes) -> List[Self class PacketOutputRouted(Serializable): """ePacket to be transmitted by gateway with complete route""" - def __init__(self, route: List[HopOutput], ptype: InfuseType, payload: bytes): + def __init__(self, route: list[HopOutput], ptype: InfuseType, payload: bytes): # [Serial, hop, hop, final_hop] self.route = route self.ptype = ptype @@ -282,7 +282,7 @@ def to_serial(self, database: DeviceDatabase) -> bytes: ciphertext = chachapoly_encrypt(key, header_bytes[:11], header_bytes[11:], payload) return header_bytes + ciphertext - def to_json(self) -> Dict: + def to_json(self) -> dict: return { "route": [x.to_json() for x in self.route], "type": self.ptype.value, @@ -290,7 +290,7 @@ def to_json(self) -> Dict: } @classmethod - def from_json(cls, values: Dict) -> Self: + def from_json(cls, values: dict) -> Self: return cls( route=[HopOutput.from_json(x) for x in values["route"]], ptype=InfuseType(values["type"]), @@ -307,7 +307,7 @@ def __init__(self, infuse_id: int, auth: Auth, ptype: InfuseType, payload: bytes self.ptype = ptype self.payload = payload - def to_json(self) -> Dict: + def to_json(self) -> dict: return { "infuse_id": self.infuse_id, "auth": self.auth, @@ -316,7 +316,7 @@ def to_json(self) -> Dict: } @classmethod - def from_json(cls, values: Dict) -> Self: + def from_json(cls, values: dict) -> Self: return cls( infuse_id=values["infuse_id"], auth=Auth(values["auth"]), @@ -370,7 +370,7 @@ def device_id(self, value): self._device_id_lower = value & 0xFFFFFFFF @classmethod - def parse(cls, frame: bytes) -> Tuple[Self, int]: + def parse(cls, frame: bytes) -> tuple[Self, int]: """Parse frame into header and payload length""" return ( cls.from_buffer_copy(frame), diff --git a/src/infuse_iot/generated/tdf_base.py b/src/infuse_iot/generated/tdf_base.py index cb3c4bc..e2cab11 100644 --- a/src/infuse_iot/generated/tdf_base.py +++ b/src/infuse_iot/generated/tdf_base.py @@ -1,7 +1,8 @@ #!/usr/bin/env python3 import ctypes -from typing import Any, Generator, cast +from collections.abc import Generator +from typing import Any, cast from typing_extensions import Self diff --git a/src/infuse_iot/generated/tdf_definitions.py b/src/infuse_iot/generated/tdf_definitions.py index 1d89ab2..b8273fb 100644 --- a/src/infuse_iot/generated/tdf_definitions.py +++ b/src/infuse_iot/generated/tdf_definitions.py @@ -3,7 +3,6 @@ """Autogenerated TDF decoding logic""" import ctypes -from typing import Dict from infuse_iot.generated.tdf_base import TdfReadingBase, TdfStructBase @@ -803,7 +802,7 @@ class array_type(TdfReadingBase): } -id_type_mapping: Dict[int, TdfReadingBase] = { +id_type_mapping: dict[int, TdfReadingBase] = { 1: readings.announce, 2: readings.battery_state, 3: readings.ambient_temp_pres_hum, diff --git a/src/infuse_iot/socket_comms.py b/src/infuse_iot/socket_comms.py index 7bb721e..03155ca 100644 --- a/src/infuse_iot/socket_comms.py +++ b/src/infuse_iot/socket_comms.py @@ -4,7 +4,7 @@ import json import socket import struct -from typing import Any, Dict +from typing import Any from typing_extensions import Self @@ -32,9 +32,9 @@ def __init__( self.epacket = epacket self.connection_id = connection_id - def to_json(self) -> Dict: + def to_json(self) -> dict: """Convert class to json dictionary""" - out: Dict[str, Any] = {"type": int(self.type)} + out: dict[str, Any] = {"type": int(self.type)} if self.epacket: out["epacket"] = self.epacket.to_json() if self.connection_id: @@ -42,7 +42,7 @@ def to_json(self) -> Dict: return out @classmethod - def from_json(cls, values: Dict) -> Self: + def from_json(cls, values: dict) -> Self: """Reconstruct class from json dictionary""" if j := values.get("epacket"): epacket = PacketReceived.from_json(j) @@ -73,9 +73,9 @@ def __init__( self.epacket = epacket self.connection_id = connection_id - def to_json(self) -> Dict: + def to_json(self) -> dict: """Convert class to json dictionary""" - out: Dict[str, Any] = {"type": int(self.type)} + out: dict[str, Any] = {"type": int(self.type)} if self.epacket: out["epacket"] = self.epacket.to_json() if self.connection_id: @@ -83,7 +83,7 @@ def to_json(self) -> Dict: return out @classmethod - def from_json(cls, values: Dict) -> Self: + def from_json(cls, values: dict) -> Self: """Reconstruct class from json dictionary""" if j := values.get("epacket"): epacket = PacketOutput.from_json(j) diff --git a/src/infuse_iot/tdf.py b/src/infuse_iot/tdf.py index 08c76bb..3070347 100644 --- a/src/infuse_iot/tdf.py +++ b/src/infuse_iot/tdf.py @@ -2,7 +2,7 @@ import ctypes import enum -from typing import Generator, List, Type +from collections.abc import Generator from infuse_iot.generated import tdf_base, tdf_definitions from infuse_iot.time import InfuseTime @@ -60,7 +60,7 @@ def __init__( self, time: None | float, period: None | float, - data: List[tdf_base.TdfReadingBase], + data: list[tdf_base.TdfReadingBase], ): self.time = time self.period = period @@ -70,7 +70,7 @@ def __init__(self): pass @staticmethod - def _buffer_pull(buffer: bytes, ctype: Type[ctypes.LittleEndianStructure]): + def _buffer_pull(buffer: bytes, ctype: type[ctypes.LittleEndianStructure]): v = ctype.from_buffer_copy(buffer) b = buffer[ctypes.sizeof(ctype) :] return v, b diff --git a/src/infuse_iot/tools/gateway.py b/src/infuse_iot/tools/gateway.py index 2681f2b..3d5f075 100644 --- a/src/infuse_iot/tools/gateway.py +++ b/src/infuse_iot/tools/gateway.py @@ -13,7 +13,7 @@ import random import threading import time -from typing import Callable, Dict +from typing import Callable import cryptography import cryptography.exceptions @@ -51,7 +51,7 @@ class LocalRpcServer: def __init__(self, database: DeviceDatabase): self._cnt = random.randint(0, 2**31) self._ddb = database - self._queued: Dict[int, Callable | None] = {} + self._queued: dict[int, Callable | None] = {} def generate(self, command: int, args: bytes, auth: Auth, cb: Callable | None): """Generate RPC packet from arguments"""