Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/infuse_iot/generated/rpc_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class rpc_struct_kv_store_crc(ctypes.LittleEndianStructure):

_fields_ = [
("id", ctypes.c_uint16),
("crc", ctypes.c_int32),
("crc", ctypes.c_uint32),
]
_pack_ = 1

Expand Down Expand Up @@ -111,10 +111,10 @@ class rpc_struct_lte_state(ctypes.LittleEndianStructure):
("mnc", ctypes.c_uint16),
("cell_id", ctypes.c_uint32),
("tac", ctypes.c_uint32),
("tau", ctypes.c_uint32),
("tau", ctypes.c_int32),
("earfcn", ctypes.c_uint16),
("band", ctypes.c_uint8),
("psm_active_time", ctypes.c_uint16),
("psm_active_time", ctypes.c_int16),
("edrx_interval", ctypes.c_float),
("edrx_paging_window", ctypes.c_float),
("rsrp", ctypes.c_int16),
Expand Down
7 changes: 6 additions & 1 deletion src/infuse_iot/rpc_wrappers/kv_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ def handle_response(self, return_code, response):
for r in response:
if r.len > 0:
b = bytes(r.data)
kv_type = kv.slots.ID_MAPPING[r.id]
try:
kv_type = kv.slots.ID_MAPPING[r.id]
except KeyError:
print(f"Key: {r.id} ({r.len} bytes):")
print(f"\tHex: {b.hex()}")
continue
kv_val = kv_type.vla_from_buffer_copy(b)

print(f"Key: {kv_type.NAME} ({r.len} bytes):")
Expand Down
40 changes: 40 additions & 0 deletions src/infuse_iot/rpc_wrappers/kv_reflect_crcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python3

import ctypes
import os

import infuse_iot.generated.rpc_definitions as defs
from infuse_iot.commands import InfuseRpcCommand
from infuse_iot.util.ctypes import VLACompatLittleEndianStruct


class kv_reflect_crcs(InfuseRpcCommand, defs.kv_reflect_crcs):
HELP = "Read KV store reflection crc values"
DESCRIPTION = "Read KV store reflection crc values"

class response(VLACompatLittleEndianStruct):
_fields_ = [
("num", ctypes.c_uint16),
("remaining", ctypes.c_uint16),
]
vla_field = ("crcs", 0 * defs.rpc_struct_kv_store_crc)
_pack_ = 1

@classmethod
def add_parser(cls, parser):
parser.add_argument("--offset", type=int, default=0, help="Offset to start CRC read at")

def __init__(self, args):
self.offset = args.offset

def request_struct(self):
return self.request(self.offset)

def handle_response(self, return_code, response):
if return_code != 0:
print(f"Invalid query ({os.strerror(-return_code)})")
return

print(f"Slot CRCs ({response.remaining} remaining):")
for slot in response.crcs:
print(f"\t{slot.id:5d}: 0x{slot.crc:08x}")
63 changes: 63 additions & 0 deletions src/infuse_iot/rpc_wrappers/kv_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python3

import ctypes
import os

import infuse_iot.generated.rpc_definitions as defs
from infuse_iot.commands import InfuseRpcCommand
from infuse_iot.util.ctypes import VLACompatLittleEndianStruct, bytes_to_uint8


class kv_write(InfuseRpcCommand, defs.kv_write):
HELP = "Write an arbitrary kv value"
DESCRIPTION = "Write an arbitrary kv value"

class request(ctypes.LittleEndianStructure):
_fields_ = [
("num", ctypes.c_uint8),
]
_pack_ = 1

class response(VLACompatLittleEndianStruct):
vla_field = ("rc", 0 * ctypes.c_int16)

@staticmethod
def kv_store_value_factory(id, value_bytes):
class kv_store_value(ctypes.LittleEndianStructure):
_fields_ = [
("id", ctypes.c_uint16),
("len", ctypes.c_uint16),
("data", ctypes.c_ubyte * len(value_bytes)),
]
_pack_ = 1

return kv_store_value(id, len(value_bytes), bytes_to_uint8(value_bytes))

@classmethod
def add_parser(cls, parser):
parser.add_argument("--key", type=int, required=True, help="KV key ID")
parser.add_argument("--value", type=str, required=True, help="KV value as hex string")

def __init__(self, args):
self.key = args.key
self.value = bytes.fromhex(args.value)

def request_struct(self):
kv_struct = self.kv_store_value_factory(self.key, self.value)
request_bytes = bytes(kv_struct)
return bytes(self.request(1)) + request_bytes

def handle_response(self, return_code, response):
if return_code != 0:
print(f"Invalid data buffer ({os.strerror(-return_code)})")
return

def print_status(name, rc):
if rc < 0:
print(f"{name} failed to write ({os.strerror(-rc)})")
elif rc == 0:
print(f"{name} already matched")
else:
print(f"{name} updated")

print_status(f"{self.key} value", response.rc[0])
4 changes: 2 additions & 2 deletions src/infuse_iot/rpc_wrappers/lte_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ class lte_state_struct(ctypes.LittleEndianStructure):
("mnc", ctypes.c_uint16),
("cell_id", ctypes.c_uint32),
("tac", ctypes.c_uint32),
("tau", ctypes.c_uint32),
("tau", ctypes.c_int32),
("earfcn", ctypes.c_uint16),
("band", ctypes.c_uint8),
("psm_active_time", ctypes.c_uint16),
("psm_active_time", ctypes.c_int16),
("edrx_interval", ctypes.c_float),
("edrx_window", ctypes.c_float),
("rsrp", ctypes.c_int16),
Expand Down
2 changes: 1 addition & 1 deletion src/infuse_iot/tools/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def run(self):
interface: soc.ProvisioningInterface
if self._vendor == "nrf":
interface = nrf.Interface(self._snr)
if self._vendor == "stm":
elif self._vendor == "stm":
interface = stm.Interface()
else:
raise NotImplementedError(f"Unhandled vendor '{self._vendor}'")
Expand Down
2 changes: 1 addition & 1 deletion src/infuse_iot/tools/tdf_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def run(self) -> None:
if idx == 0:
if tdf.time is not None:
if tdf.period is None:
time = ""
time = InfuseTime.utc_time_string(tdf.time)
else:
offset = (len(tdf.data) - 1) * tdf.period
time = InfuseTime.utc_time_string(tdf.time + offset)
Expand Down
Loading