diff --git a/src/infuse_iot/commands.py b/src/infuse_iot/commands.py index f65779c..3b1de97 100644 --- a/src/infuse_iot/commands.py +++ b/src/infuse_iot/commands.py @@ -71,6 +71,10 @@ def data_payload(self) -> bytes: """Payload to send with RPC_DATA""" raise NotImplementedError + def data_payload_recv_len(self) -> int: + """Length of payload to receive with RPC_DATA""" + return 0xFFFFFFFF + def data_recv_cb(self, offset: int, data: bytes) -> None: """Data received callback""" diff --git a/src/infuse_iot/generated/kv_definitions.py b/src/infuse_iot/generated/kv_definitions.py index fec6df0..01431cf 100644 --- a/src/infuse_iot/generated/kv_definitions.py +++ b/src/infuse_iot/generated/kv_definitions.py @@ -280,6 +280,35 @@ class geofence(VLACompatLittleEndianStruct): vla_field = ("points", 0 * structs.gcs_location) _pack_ = 1 + class task_schedules_default_id(VLACompatLittleEndianStruct): + """Unique identifier for default schedule set""" + + NAME = "TASK_SCHEDULES_DEFAULT_ID" + BASE_ID = 1000 + RANGE = 1 + _fields_ = [ + ("set_id", ctypes.c_uint32), + ] + _pack_ = 1 + + class task_schedules(VLACompatLittleEndianStruct): + """Task runner task schedule definition (@ref task_schedule)""" + + NAME = "TASK_SCHEDULES" + BASE_ID = 1001 + RANGE = 32 + _fields_ = [ + ("task_id", ctypes.c_uint8), + ("validity", ctypes.c_uint8), + ("periodicity_type", ctypes.c_uint8), + ("timeout_s", ctypes.c_uint32), + ("battery_start_threshold", ctypes.c_uint8), + ("battery_terminate_threshold", ctypes.c_uint8), + ("periodicity", ctypes.c_uint32), + ] + vla_field = ("_remainder", 0 * ctypes.c_uint8) + _pack_ = 1 + class secure_storage_reserved(VLACompatLittleEndianStruct): """Keys reserved for secure storage (do not enable)""" @@ -328,6 +357,39 @@ class secure_storage_reserved(VLACompatLittleEndianStruct): 113: geofence, 114: geofence, 115: geofence, + 1000: task_schedules_default_id, + 1001: task_schedules, + 1002: task_schedules, + 1003: task_schedules, + 1004: task_schedules, + 1005: task_schedules, + 1006: task_schedules, + 1007: task_schedules, + 1008: task_schedules, + 1009: task_schedules, + 1010: task_schedules, + 1011: task_schedules, + 1012: task_schedules, + 1013: task_schedules, + 1014: task_schedules, + 1015: task_schedules, + 1016: task_schedules, + 1017: task_schedules, + 1018: task_schedules, + 1019: task_schedules, + 1020: task_schedules, + 1021: task_schedules, + 1022: task_schedules, + 1023: task_schedules, + 1024: task_schedules, + 1025: task_schedules, + 1026: task_schedules, + 1027: task_schedules, + 1028: task_schedules, + 1029: task_schedules, + 1030: task_schedules, + 1031: task_schedules, + 1032: task_schedules, 30000: secure_storage_reserved, 30001: secure_storage_reserved, 30002: secure_storage_reserved, diff --git a/src/infuse_iot/generated/rpc_definitions.py b/src/infuse_iot/generated/rpc_definitions.py index cfa7be1..7d68a58 100644 --- a/src/infuse_iot/generated/rpc_definitions.py +++ b/src/infuse_iot/generated/rpc_definitions.py @@ -493,6 +493,27 @@ class response(ctypes.LittleEndianStructure): _pack_ = 1 +class mem_read: + """Read arbitrary memory (NO ADDRESS VALIDATION PERFORMED)""" + + HELP = "Read arbitrary memory (NO ADDRESS VALIDATION PERFORMED)" + DESCRIPTION = "Read arbitrary memory (NO ADDRESS VALIDATION PERFORMED)" + COMMAND_ID = 15 + + class request(ctypes.LittleEndianStructure): + _fields_ = [ + ("address", ctypes.c_uint32), + ] + _pack_ = 1 + + class response(ctypes.LittleEndianStructure): + _fields_ = [ + ("sent_len", ctypes.c_uint32), + ("sent_crc", ctypes.c_uint32), + ] + _pack_ = 1 + + class lte_at_cmd: """Run AT command against LTE modem""" diff --git a/src/infuse_iot/rpc_client.py b/src/infuse_iot/rpc_client.py index 145b372..c1c380c 100644 --- a/src/infuse_iot/rpc_client.py +++ b/src/infuse_iot/rpc_client.py @@ -152,12 +152,13 @@ def run_data_recv_cmd( cmd_id: int, auth: Auth, params: bytes, + size: int, recv_cb: Callable[[int, bytes], None], rsp_decoder: Callable[[bytes], ctypes.LittleEndianStructure], ) -> tuple[rpc.ResponseHeader, ctypes.LittleEndianStructure | None]: self._request_id += 1 header = rpc.RequestHeader(self._request_id, cmd_id) - data_hdr = rpc.RequestDataHeader(0xFFFFFFFF, 0) + data_hdr = rpc.RequestDataHeader(size, 0) request_packet = bytes(header) + bytes(data_hdr) + params pkt = PacketOutput( diff --git a/src/infuse_iot/rpc_wrappers/sym_read.py b/src/infuse_iot/rpc_wrappers/sym_read.py new file mode 100644 index 0000000..9b56e5e --- /dev/null +++ b/src/infuse_iot/rpc_wrappers/sym_read.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 + +import binascii +import sys +from typing import Optional + +import tabulate +from elftools.dwarf.die import DIE +from elftools.elf.elffile import ELFFile + +import infuse_iot.generated.rpc_definitions as defs +from infuse_iot.commands import InfuseRpcCommand +from infuse_iot.util import console, elftools +from infuse_iot.util.argparse import ValidFile +from infuse_iot.zephyr.errno import errno + + +class sym_read(InfuseRpcCommand, defs.mem_read): + RPC_DATA_RECEIVE = True + + @classmethod + def add_parser(cls, parser): + parser.add_argument("--elf", type=ValidFile, help="ELF file to read symbol data from") + read_type = parser.add_mutually_exclusive_group(required=True) + read_type.add_argument("--sym", type=str, help="Symbol name to read") + read_type.add_argument("--addr", type=lambda x: int(x, 0), help="Address to read") + + def __init__(self, args): + # Ignore context-manager warning since ELFFile requires the file to remain opened + self.elf_file = open(args.elf, "rb") # noqa: SIM115 + self.elf = ELFFile(self.elf_file) + self.symbol_die: Optional[DIE] + + if args.sym: + symbols = elftools.symbols_from_name(self.elf, args.sym) + if len(symbols) == 0: + sys.exit(f"{args.sym} not found in '{args.elf}' symbol table") + elif len(symbols) == 1: + self.symbol_die = elftools.dwarf_die_from_symbol(self.elf, symbols[0]) + idx = 0 + else: + dies = [] + options = [] + # User readable selection requires the filename and line number + for s in symbols: + die = elftools.dwarf_die_from_symbol(self.elf, s) + if die is None: + continue + filename, linenum = elftools.dwarf_die_file_info(self.elf, die) + dies.append(die) + options.append(f"{filename}:{linenum}") + # Ask the user which symbol they mean + try: + idx, _ = console.choose_one(f"Multiple symbols matching '{args.sym}', choose one:", options) + except IndexError: + sys.exit("No symbol chosen...") + self.symbol_die = dies[idx] + + self.symbol = symbols[idx] + elif args.addr: + symbol = elftools.symbol_from_address(self.elf, args.addr) + if symbol is None: + sys.exit(f"Could not find symbol for address 0x{args.addr:08x} in '{args.elf}' symbol table") + self.symbol = symbol + else: + raise NotImplementedError("Unexpected symbol refrence") + + self.address = self.symbol.entry["st_value"] + self.num = self.symbol.entry["st_size"] + + dwarf_info = self.elf.get_dwarf_info() + if self.symbol_die is not None: + self.symbol_info = elftools.dwarf_die_variable_inf(dwarf_info, self.symbol_die) + + self.expected_offset = 0 + self.output = b"" + + def request_struct(self): + return self.request(self.address) + + def data_payload_recv_len(self): + return self.num + + def data_recv_cb(self, offset: int, data: bytes) -> None: + if offset != self.expected_offset: + missing = offset - self.expected_offset + print(f"Missed {missing:d} bytes from offset 0x{self.expected_offset:08x}") + self.output += b"\x00" * missing + + self.output += data + # Next expected offset + self.expected_offset = offset + len(data) + + def handle_response(self, return_code, response): + if return_code != 0: + print(f"Failed to read data logger ({errno.strerror(-return_code)})") + return + + if response.sent_len != len(self.output): + print(f"Unexpected received length ({response.sent_len} != {len(self.output)})") + return + + if response.sent_crc != binascii.crc32(self.output): + print(f"Unexpected received length ({response.sent_crc:08x} != {binascii.crc32(self.output)}:08x)") + return + + if self.elf is None: + # Hexdump the received payload + for offset in range(0, len(self.output), 16): + print(f"{self.address + offset:08x}: {self.output[offset : offset + 16].hex()}") + return + + # Parse returned value + symbol_size = self.symbol.entry["st_size"] + assert len(self.output) >= symbol_size + + if self.symbol_die is not None: + filename, linenum = elftools.dwarf_die_file_info(self.elf, self.symbol_die) + print(f" Symbol: {self.symbol.name} ({filename}:{linenum})") + + address_base = self.symbol.entry["st_value"] + print(f"Address: 0x{address_base:x}") + print(f" Size: {symbol_size} bytes") + if symbol_size <= 32: + print(f" Raw: {self.output.hex()}") + else: + print(f" Raw: {self.output[:32].hex()}...") + + def info_table(info, offset=0): + table = [[f"{' ' * offset}{info.name}", f"({info.tag}) ({info.ctype}) {info.offset}", ""]] + for child in info.children: + table += info_table(child, offset + 1) + return table + + def field_table(info: elftools.dwarf_field, buffer: bytes, offset: int = 0): + if info.ctype is None: + table = [[f"0x{address_base + info.offset:08x}", f"{' ' * offset}{info.name}", "", "", ""]] + else: + value = info.ctype.from_buffer_copy(buffer, info.offset).value + value_hex = hex(value) if not isinstance(value, float) else "N/A" + points_to = "" + if info.tag == "DW_TAG_pointer_type": + if value == 0x00: + points_to = "NULL" + else: + sym = elftools.symbol_from_address(self.elf, value) + if sym: + ptr_offset = "" + if value != sym.entry["st_value"]: + ptr_offset = f" (+ {value - sym.entry['st_value']})" + points_to = f"{sym.name}{ptr_offset}" + else: + points_to = "" + table = [ + [f"0x{address_base + info.offset:08x}", f"{' ' * offset}{info.name}", value, value_hex, points_to] + ] + for child in info.children: + table += field_table(child, buffer, offset + 1) + return table + + tabulate.PRESERVE_WHITESPACE = True + print( + tabulate.tabulate(field_table(self.symbol_info, self.output), ["Address", "Field", "Value", "Hex", "Ptr"]) + ) diff --git a/src/infuse_iot/tools/rpc.py b/src/infuse_iot/tools/rpc.py index ebf1b16..c3e78ea 100644 --- a/src/infuse_iot/tools/rpc.py +++ b/src/infuse_iot/tools/rpc.py @@ -99,6 +99,7 @@ def run(self): self._command.COMMAND_ID, # type: ignore self._command.auth_level(), params, + self._command.data_payload_recv_len(), self._command.data_recv_cb, decode_fn, ) diff --git a/src/infuse_iot/util/console.py b/src/infuse_iot/util/console.py index 1ee953b..e3655a9 100644 --- a/src/infuse_iot/util/console.py +++ b/src/infuse_iot/util/console.py @@ -5,6 +5,11 @@ import colorama +try: + from simple_term_menu import TerminalMenu +except NotImplementedError: + pass + class Console: """Common terminal logging functions""" @@ -47,3 +52,30 @@ def log(timestamp: datetime.datetime, colour, string: str): """Log colourised string to terminal""" ts = timestamp.strftime("%H:%M:%S.%f")[:-3] print(f"[{ts}]{colour} {string}") + + +def choose_one(title: str, options: list[str]) -> tuple[int, str]: + """Select a single option from a list""" + + if TerminalMenu: + # Linux & MacOS + terminal_menu = TerminalMenu(options, title=title) + idx = terminal_menu.show() + if idx is None: + raise IndexError("No option chosen") + return idx, options[idx] + else: + # Windows + print(title) + for idx, option in enumerate(options): + print(f" {idx:2d}: {option}") + idx = None + while idx is None: + try: + idx = int(input(f"Enter index between 0 and {len(options) - 1}:")) + if not (0 <= idx < len(options)): + idx = None + except ValueError: + pass + + return idx, options[idx] diff --git a/src/infuse_iot/util/elftools.py b/src/infuse_iot/util/elftools.py new file mode 100644 index 0000000..e3c943c --- /dev/null +++ b/src/infuse_iot/util/elftools.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 + +import ctypes +from typing import Optional + +from elftools.dwarf.die import DIE +from elftools.dwarf.dwarf_expr import DW_OP_name2opcode +from elftools.dwarf.dwarfinfo import DWARFInfo +from elftools.elf.elffile import ELFFile +from elftools.elf.sections import Symbol, SymbolTableSection +from typing_extensions import Self + +TYPEDEF_TO_CTYPE: dict[str, type[ctypes._SimpleCData]] = { + "int8_t": ctypes.c_int8, + "uint8_t": ctypes.c_uint8, + "int16_t": ctypes.c_int16, + "uint16_t": ctypes.c_uint16, + "int32_t": ctypes.c_int32, + "uint32_t": ctypes.c_uint32, + "int64_t": ctypes.c_int64, + "uint64_t": ctypes.c_uint64, +} + +DWARF_TO_CTYPE: dict[str, type[ctypes._SimpleCData]] = { + "_Bool": ctypes.c_bool, + "unsigned char": ctypes.c_ubyte, + "signed char": ctypes.c_byte, + "char": ctypes.c_byte, + "short unsigned int": ctypes.c_uint16, + "unsigned int": ctypes.c_uint32, + "int": ctypes.c_int32, + "long int": ctypes.c_int32, + "long long int": ctypes.c_int64, + "long long unsigned int": ctypes.c_uint64, + "float": ctypes.c_float, + "double": ctypes.c_double, +} + + +def symbols_from_name(elf: ELFFile, name: str) -> list[Symbol]: + """Get a list of symbols from an ELF file with names matching the provided string""" + symtab = None + + # Locate the symbol table + for section in elf.iter_sections(): + if isinstance(section, SymbolTableSection): + symtab = section + break + + if not symtab: + return [] + + symbols = [] + # Search for the symbol in the symbol table + for symbol in symtab.iter_symbols(): + if symbol.name == name: + symbols.append(symbol) + return symbols + + +def symbol_from_address(elf: ELFFile, address: int) -> Optional[Symbol]: + """Get a list of symbols from an ELF file with names matching the provided string""" + symtab = None + + # Locate the symbol table + for section in elf.iter_sections(): + if isinstance(section, SymbolTableSection): + symtab = section + break + + if not symtab: + return None + + # Search for the symbol in the symbol table + for symbol in symtab.iter_symbols(): + start = symbol.entry["st_value"] + size = symbol.entry["st_size"] + end = start + size - 1 + if start <= address <= end: + return symbol + return None + + +def dwarf_die_from_symbol(elf: ELFFile, symbol: Symbol) -> DIE | None: + """Get a Debug Information Entry associated with a symbol (Global variables only)""" + dwarfinfo = elf.get_dwarf_info() + + for CU in dwarfinfo.iter_CUs(): + for die in CU.iter_DIEs(): + if die.tag == "DW_TAG_variable" and "DW_AT_name" in die.attributes and "DW_AT_location" in die.attributes: + die_name = die.attributes["DW_AT_name"].value.decode("utf-8") + die_location = die.attributes.get("DW_AT_location").value + # Not our symbol + if die_name != symbol.name: + continue + # Constant addresses are in a list of form [0x03, addr_bytes] + if not isinstance(die_location, list): + continue + if die_location[0] != DW_OP_name2opcode["DW_OP_addr"]: + continue + address = int.from_bytes(die_location[1:], "little") + if address == symbol.entry["st_value"]: + return die + return None + + +def dwarf_die_file_info(elf: ELFFile, die: DIE) -> tuple[Optional[str], int]: + file_attr = die.attributes["DW_AT_decl_file"] + line_attr = die.attributes["DW_AT_decl_line"] + + dwarfinfo = elf.get_dwarf_info() + lineprogram = dwarfinfo.line_program_for_CU(die.cu) + if lineprogram is None: + cu_filename = None + else: + cu_filename = lineprogram["file_entry"][file_attr.value - 1].name.decode("latin-1") + + return cu_filename, line_attr.value + + +class dwarf_field: + def __init__( + self, + name: str, + tag: str, + ctype: Optional[type[ctypes._SimpleCData]], + children: Optional[list[Self]], + offset: int, + ): + self.name = name + self.tag = tag + self.ctype = ctype + if children is None: + self.children = [] + else: + self.children = children + self.offset = offset + + +def _type_from_dwarf_info(dwarfinfo: DWARFInfo, die: DIE): + refaddr = die.attributes["DW_AT_type"].value + die.cu.cu_offset + return dwarfinfo.get_DIE_from_refaddr(refaddr, die.cu) + + +def dwarf_die_variable_inf( + dwarfinfo: DWARFInfo, die: DIE, offset: int = 0, name_override: Optional[str] = None +) -> dwarf_field: + type_die = _type_from_dwarf_info(dwarfinfo, die) + + if "DW_AT_name" in die.attributes: + field_name = die.attributes["DW_AT_name"].value.decode("utf-8") + else: + field_name = "" + if "DW_AT_name" in type_die.attributes: + type_name = type_die.attributes["DW_AT_name"].value.decode("utf-8") + else: + type_name = None + info_name = name_override if name_override is not None else field_name + + # print(field_name, type_name, type_die.tag) + + if type_die.tag == "DW_TAG_array_type": + count = 0 + for child in type_die.iter_children(): + if child.tag == "DW_TAG_subrange_type" and "DW_AT_upper_bound" in child.attributes: + count = child.attributes["DW_AT_upper_bound"].value + 1 + element_die = _type_from_dwarf_info(dwarfinfo, type_die) + + children = [] + element_offset = 0 + for idx in range(count): + child = dwarf_die_variable_inf(dwarfinfo, type_die, offset + element_offset, f"{info_name}[{idx}]") + if "DW_AT_byte_size" in element_die.attributes: + element_offset += element_die.attributes["DW_AT_byte_size"].value + else: + element_offset += ctypes.sizeof(child.ctype) + children.append(child) + + return dwarf_field(info_name, type_die.tag, None, children, offset) + elif type_die.tag == "DW_TAG_structure_type": + children = [] + for child in type_die.iter_children(): + field_offset = child.attributes["DW_AT_data_member_location"].value + child_field = dwarf_die_variable_inf(dwarfinfo, child, offset + field_offset) + children.append(child_field) + + return dwarf_field(info_name, type_die.tag, None, children, offset) + + elif type_die.tag == "DW_TAG_union_type": + children = [] + for child in type_die.iter_children(): + child_field = dwarf_die_variable_inf(dwarfinfo, child, offset) + children.append(child_field) + return dwarf_field(info_name, type_die.tag, None, children, offset) + elif type_die.tag == "DW_TAG_typedef": + if type_name in TYPEDEF_TO_CTYPE: + return dwarf_field(info_name, type_die.tag, TYPEDEF_TO_CTYPE[type_name], None, offset) + else: + return dwarf_die_variable_inf(dwarfinfo, type_die, offset, name_override) + elif type_die.tag == "DW_TAG_base_type": + if type_name not in DWARF_TO_CTYPE: + raise NotImplementedError(f"'{type_name}' not known in DWARF_TO_CTYPE") + return dwarf_field(info_name, type_die.tag, DWARF_TO_CTYPE[type_name], None, offset) + elif type_die.tag == "DW_TAG_pointer_type": + return dwarf_field(info_name, type_die.tag, ctypes.c_uint32, None, offset) + elif type_die.tag in ["DW_TAG_enumeration_type", "DW_TAG_const_type"]: + return dwarf_die_variable_inf(dwarfinfo, type_die, offset) + else: + raise NotImplementedError(type_die.tag)