Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/infuse_iot/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ def data_payload(self) -> bytes:
"""Payload to send with RPC_DATA"""
raise NotImplementedError

def data_payload_recv_len(self) -> int:
"""Length of payload to receive with RPC_DATA"""
return 0xFFFFFFFF

def data_recv_cb(self, offset: int, data: bytes) -> None:
"""Data received callback"""

Expand Down
62 changes: 62 additions & 0 deletions src/infuse_iot/generated/kv_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,35 @@ class geofence(VLACompatLittleEndianStruct):
vla_field = ("points", 0 * structs.gcs_location)
_pack_ = 1

class task_schedules_default_id(VLACompatLittleEndianStruct):
"""Unique identifier for default schedule set"""

NAME = "TASK_SCHEDULES_DEFAULT_ID"
BASE_ID = 1000
RANGE = 1
_fields_ = [
("set_id", ctypes.c_uint32),
]
_pack_ = 1

class task_schedules(VLACompatLittleEndianStruct):
"""Task runner task schedule definition (@ref task_schedule)"""

NAME = "TASK_SCHEDULES"
BASE_ID = 1001
RANGE = 32
_fields_ = [
("task_id", ctypes.c_uint8),
("validity", ctypes.c_uint8),
("periodicity_type", ctypes.c_uint8),
("timeout_s", ctypes.c_uint32),
("battery_start_threshold", ctypes.c_uint8),
("battery_terminate_threshold", ctypes.c_uint8),
("periodicity", ctypes.c_uint32),
]
vla_field = ("_remainder", 0 * ctypes.c_uint8)
_pack_ = 1

class secure_storage_reserved(VLACompatLittleEndianStruct):
"""Keys reserved for secure storage (do not enable)"""

Expand Down Expand Up @@ -328,6 +357,39 @@ class secure_storage_reserved(VLACompatLittleEndianStruct):
113: geofence,
114: geofence,
115: geofence,
1000: task_schedules_default_id,
1001: task_schedules,
1002: task_schedules,
1003: task_schedules,
1004: task_schedules,
1005: task_schedules,
1006: task_schedules,
1007: task_schedules,
1008: task_schedules,
1009: task_schedules,
1010: task_schedules,
1011: task_schedules,
1012: task_schedules,
1013: task_schedules,
1014: task_schedules,
1015: task_schedules,
1016: task_schedules,
1017: task_schedules,
1018: task_schedules,
1019: task_schedules,
1020: task_schedules,
1021: task_schedules,
1022: task_schedules,
1023: task_schedules,
1024: task_schedules,
1025: task_schedules,
1026: task_schedules,
1027: task_schedules,
1028: task_schedules,
1029: task_schedules,
1030: task_schedules,
1031: task_schedules,
1032: task_schedules,
30000: secure_storage_reserved,
30001: secure_storage_reserved,
30002: secure_storage_reserved,
Expand Down
21 changes: 21 additions & 0 deletions src/infuse_iot/generated/rpc_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,27 @@ class response(ctypes.LittleEndianStructure):
_pack_ = 1


class mem_read:
"""Read arbitrary memory (NO ADDRESS VALIDATION PERFORMED)"""

HELP = "Read arbitrary memory (NO ADDRESS VALIDATION PERFORMED)"
DESCRIPTION = "Read arbitrary memory (NO ADDRESS VALIDATION PERFORMED)"
COMMAND_ID = 15

class request(ctypes.LittleEndianStructure):
_fields_ = [
("address", ctypes.c_uint32),
]
_pack_ = 1

class response(ctypes.LittleEndianStructure):
_fields_ = [
("sent_len", ctypes.c_uint32),
("sent_crc", ctypes.c_uint32),
]
_pack_ = 1


class lte_at_cmd:
"""Run AT command against LTE modem"""

Expand Down
3 changes: 2 additions & 1 deletion src/infuse_iot/rpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,13 @@ def run_data_recv_cmd(
cmd_id: int,
auth: Auth,
params: bytes,
size: int,
recv_cb: Callable[[int, bytes], None],
rsp_decoder: Callable[[bytes], ctypes.LittleEndianStructure],
) -> tuple[rpc.ResponseHeader, ctypes.LittleEndianStructure | None]:
self._request_id += 1
header = rpc.RequestHeader(self._request_id, cmd_id)
data_hdr = rpc.RequestDataHeader(0xFFFFFFFF, 0)
data_hdr = rpc.RequestDataHeader(size, 0)

request_packet = bytes(header) + bytes(data_hdr) + params
pkt = PacketOutput(
Expand Down
164 changes: 164 additions & 0 deletions src/infuse_iot/rpc_wrappers/sym_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#!/usr/bin/env python3

import binascii
import sys
from typing import Optional

import tabulate
from elftools.dwarf.die import DIE
from elftools.elf.elffile import ELFFile

import infuse_iot.generated.rpc_definitions as defs
from infuse_iot.commands import InfuseRpcCommand
from infuse_iot.util import console, elftools
from infuse_iot.util.argparse import ValidFile
from infuse_iot.zephyr.errno import errno


class sym_read(InfuseRpcCommand, defs.mem_read):
RPC_DATA_RECEIVE = True

@classmethod
def add_parser(cls, parser):
parser.add_argument("--elf", type=ValidFile, help="ELF file to read symbol data from")
read_type = parser.add_mutually_exclusive_group(required=True)
read_type.add_argument("--sym", type=str, help="Symbol name to read")
read_type.add_argument("--addr", type=lambda x: int(x, 0), help="Address to read")

def __init__(self, args):
# Ignore context-manager warning since ELFFile requires the file to remain opened
self.elf_file = open(args.elf, "rb") # noqa: SIM115
self.elf = ELFFile(self.elf_file)
self.symbol_die: Optional[DIE]

if args.sym:
symbols = elftools.symbols_from_name(self.elf, args.sym)
if len(symbols) == 0:
sys.exit(f"{args.sym} not found in '{args.elf}' symbol table")
elif len(symbols) == 1:
self.symbol_die = elftools.dwarf_die_from_symbol(self.elf, symbols[0])
idx = 0
else:
dies = []
options = []
# User readable selection requires the filename and line number
for s in symbols:
die = elftools.dwarf_die_from_symbol(self.elf, s)
if die is None:
continue
filename, linenum = elftools.dwarf_die_file_info(self.elf, die)
dies.append(die)
options.append(f"{filename}:{linenum}")
# Ask the user which symbol they mean
try:
idx, _ = console.choose_one(f"Multiple symbols matching '{args.sym}', choose one:", options)
except IndexError:
sys.exit("No symbol chosen...")
self.symbol_die = dies[idx]

self.symbol = symbols[idx]
elif args.addr:
symbol = elftools.symbol_from_address(self.elf, args.addr)
if symbol is None:
sys.exit(f"Could not find symbol for address 0x{args.addr:08x} in '{args.elf}' symbol table")
self.symbol = symbol
else:
raise NotImplementedError("Unexpected symbol refrence")

self.address = self.symbol.entry["st_value"]
self.num = self.symbol.entry["st_size"]

dwarf_info = self.elf.get_dwarf_info()
if self.symbol_die is not None:
self.symbol_info = elftools.dwarf_die_variable_inf(dwarf_info, self.symbol_die)

self.expected_offset = 0
self.output = b""

def request_struct(self):
return self.request(self.address)

def data_payload_recv_len(self):
return self.num

def data_recv_cb(self, offset: int, data: bytes) -> None:
if offset != self.expected_offset:
missing = offset - self.expected_offset
print(f"Missed {missing:d} bytes from offset 0x{self.expected_offset:08x}")
self.output += b"\x00" * missing

self.output += data
# Next expected offset
self.expected_offset = offset + len(data)

def handle_response(self, return_code, response):
if return_code != 0:
print(f"Failed to read data logger ({errno.strerror(-return_code)})")
return

if response.sent_len != len(self.output):
print(f"Unexpected received length ({response.sent_len} != {len(self.output)})")
return

if response.sent_crc != binascii.crc32(self.output):
print(f"Unexpected received length ({response.sent_crc:08x} != {binascii.crc32(self.output)}:08x)")
return

if self.elf is None:
# Hexdump the received payload
for offset in range(0, len(self.output), 16):
print(f"{self.address + offset:08x}: {self.output[offset : offset + 16].hex()}")
return

# Parse returned value
symbol_size = self.symbol.entry["st_size"]
assert len(self.output) >= symbol_size

if self.symbol_die is not None:
filename, linenum = elftools.dwarf_die_file_info(self.elf, self.symbol_die)
print(f" Symbol: {self.symbol.name} ({filename}:{linenum})")

address_base = self.symbol.entry["st_value"]
print(f"Address: 0x{address_base:x}")
print(f" Size: {symbol_size} bytes")
if symbol_size <= 32:
print(f" Raw: {self.output.hex()}")
else:
print(f" Raw: {self.output[:32].hex()}...")

def info_table(info, offset=0):
table = [[f"{' ' * offset}{info.name}", f"({info.tag}) ({info.ctype}) {info.offset}", ""]]
for child in info.children:
table += info_table(child, offset + 1)
return table

def field_table(info: elftools.dwarf_field, buffer: bytes, offset: int = 0):
if info.ctype is None:
table = [[f"0x{address_base + info.offset:08x}", f"{' ' * offset}{info.name}", "", "", ""]]
else:
value = info.ctype.from_buffer_copy(buffer, info.offset).value
value_hex = hex(value) if not isinstance(value, float) else "N/A"
points_to = ""
if info.tag == "DW_TAG_pointer_type":
if value == 0x00:
points_to = "NULL"
else:
sym = elftools.symbol_from_address(self.elf, value)
if sym:
ptr_offset = ""
if value != sym.entry["st_value"]:
ptr_offset = f" (+ {value - sym.entry['st_value']})"
points_to = f"{sym.name}{ptr_offset}"
else:
points_to = "<unknown>"
table = [
[f"0x{address_base + info.offset:08x}", f"{' ' * offset}{info.name}", value, value_hex, points_to]
]
for child in info.children:
table += field_table(child, buffer, offset + 1)
return table

tabulate.PRESERVE_WHITESPACE = True
print(
tabulate.tabulate(field_table(self.symbol_info, self.output), ["Address", "Field", "Value", "Hex", "Ptr"])
)
1 change: 1 addition & 0 deletions src/infuse_iot/tools/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def run(self):
self._command.COMMAND_ID, # type: ignore
self._command.auth_level(),
params,
self._command.data_payload_recv_len(),
self._command.data_recv_cb,
decode_fn,
)
Expand Down
32 changes: 32 additions & 0 deletions src/infuse_iot/util/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

import colorama

try:
from simple_term_menu import TerminalMenu
except NotImplementedError:
pass


class Console:
"""Common terminal logging functions"""
Expand Down Expand Up @@ -47,3 +52,30 @@ def log(timestamp: datetime.datetime, colour, string: str):
"""Log colourised string to terminal"""
ts = timestamp.strftime("%H:%M:%S.%f")[:-3]
print(f"[{ts}]{colour} {string}")


def choose_one(title: str, options: list[str]) -> tuple[int, str]:
"""Select a single option from a list"""

if TerminalMenu:
# Linux & MacOS
terminal_menu = TerminalMenu(options, title=title)
idx = terminal_menu.show()
if idx is None:
raise IndexError("No option chosen")
return idx, options[idx]
else:
# Windows
print(title)
for idx, option in enumerate(options):
print(f" {idx:2d}: {option}")
idx = None
while idx is None:
try:
idx = int(input(f"Enter index between 0 and {len(options) - 1}:"))
if not (0 <= idx < len(options)):
idx = None
except ValueError:
pass

return idx, options[idx]
Loading