Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions src/infuse_iot/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,3 @@ def data_progress_cb(self, offset: int) -> None:
def handle_response(self, return_code, response):
"""Handle RPC_RSP"""
raise NotImplementedError

class VariableSizeResponse:
base_fields: list[tuple[str, type[ctypes._SimpleCData]]] = []
var_name = "x"
var_type: type[ctypes._SimpleCData] = ctypes.c_ubyte

@classmethod
def from_buffer_copy(cls, source, offset=0):
class response_base(ctypes.LittleEndianStructure):
_fields_ = cls.base_fields
_pack_ = 1

var_bytes = (len(source) - offset) - ctypes.sizeof(response_base)
assert var_bytes % ctypes.sizeof(cls.var_type) == 0
var_num = var_bytes // ctypes.sizeof(cls.var_type)

class response(ctypes.LittleEndianStructure):
_fields_ = [*cls.base_fields, (cls.var_name, cls.var_type * var_num)]
_pack_ = 1

return response.from_buffer_copy(source, offset)
308 changes: 308 additions & 0 deletions src/infuse_iot/generated/kv_definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
#!/usr/bin/env python3
# mypy: ignore-errors

"""Autogenerated KV store definitions"""

import ctypes

from infuse_iot.util.ctypes import VLACompatLittleEndianStruct


class structs:
class gcs_location(VLACompatLittleEndianStruct):
"""Geographic Co-ordinate System location"""

_fields_ = [
("latitude", ctypes.c_int32),
("longitude", ctypes.c_int32),
("height", ctypes.c_int32),
]
_pack_ = 1

class bt_addr_le(VLACompatLittleEndianStruct):
"""Bluetooth address type (bt_addr_le_t)"""

_fields_ = [
("type", ctypes.c_uint8),
("val", 6 * ctypes.c_uint8),
]
_pack_ = 1

class kv_string(VLACompatLittleEndianStruct):
"""String type"""

_fields_ = [
("value_num", ctypes.c_uint8),
]
vla_field = ("value", 0 * ctypes.c_char)
_pack_ = 1

class kv_mcuboot_img_sem_ver(VLACompatLittleEndianStruct):
"""MCUboot semantic versioning struct"""

_fields_ = [
("major", ctypes.c_uint8),
("minor", ctypes.c_uint8),
("revision", ctypes.c_uint16),
("build_num", ctypes.c_uint32),
]
_pack_ = 1


class slots:
class reboots(VLACompatLittleEndianStruct):
"""Number of times device has rebooted"""

NAME = "REBOOTS"
BASE_ID = 0
RANGE = 1
_fields_ = [
("count", ctypes.c_uint32),
]
_pack_ = 1

class bluetooth_addr(VLACompatLittleEndianStruct):
"""Bluetooth advertising address"""

NAME = "BLUETOOTH_ADDR"
BASE_ID = 1
RANGE = 1
_fields_ = [
("address", structs.bt_addr_le),
]
_pack_ = 1

class exfat_disk_info(VLACompatLittleEndianStruct):
"""exFAT disk information"""

NAME = "EXFAT_DISK_INFO"
BASE_ID = 2
RANGE = 1
_fields_ = [
("block_count", ctypes.c_uint32),
("block_size", ctypes.c_uint32),
]
_pack_ = 1

class bluetooth_ctlr_version(VLACompatLittleEndianStruct):
"""External Bluetooth controller version"""

NAME = "BLUETOOTH_CTLR_VERSION"
BASE_ID = 3
RANGE = 1
_fields_ = [
("application", ctypes.c_uint32),
("version", structs.kv_mcuboot_img_sem_ver),
]
_pack_ = 1

class fixed_location(VLACompatLittleEndianStruct):
"""Fixed global location of the device"""

NAME = "FIXED_LOCATION"
BASE_ID = 10
RANGE = 1
_fields_ = [
("location", structs.gcs_location),
]
_pack_ = 1

class wifi_ssid(VLACompatLittleEndianStruct):
"""WiFi network name"""

NAME = "WIFI_SSID"
BASE_ID = 20
RANGE = 1
_fields_ = [
]
vla_field = ("ssid", structs.kv_string)
_pack_ = 1

class wifi_psk(VLACompatLittleEndianStruct):
"""WiFi network password"""

NAME = "WIFI_PSK"
BASE_ID = 21
RANGE = 1
_fields_ = [
]
vla_field = ("psk", structs.kv_string)
_pack_ = 1

class ntp_server_url(VLACompatLittleEndianStruct):
"""URL of the NTP server to use for time synchronisation"""

NAME = "NTP_SERVER_URL"
BASE_ID = 30
RANGE = 1
_fields_ = [
]
vla_field = ("url", structs.kv_string)
_pack_ = 1

class epacket_udp_url(VLACompatLittleEndianStruct):
"""ePacket UDP server hostname"""

NAME = "EPACKET_UDP_URL"
BASE_ID = 31
RANGE = 1
_fields_ = [
]
vla_field = ("server", structs.kv_string)
_pack_ = 1

class epacket_udp_port(VLACompatLittleEndianStruct):
"""ePacket UDP server port"""

NAME = "EPACKET_UDP_PORT"
BASE_ID = 32
RANGE = 1
_fields_ = [
("port", ctypes.c_uint32),
]
_pack_ = 1

class lte_modem_model(VLACompatLittleEndianStruct):
"""Modem model as returned by AT+CGMM"""

NAME = "LTE_MODEM_MODEL"
BASE_ID = 40
RANGE = 1
_fields_ = [
]
vla_field = ("model", structs.kv_string)
_pack_ = 1

class lte_modem_firmware_revision(VLACompatLittleEndianStruct):
"""Modem firmware revision as returned by AT+CGMR"""

NAME = "LTE_MODEM_FIRMWARE_REVISION"
BASE_ID = 41
RANGE = 1
_fields_ = [
]
vla_field = ("revision", structs.kv_string)
_pack_ = 1

class lte_modem_esn(VLACompatLittleEndianStruct):
"""'Electronic Serial Number' as returned by AT+CGSN=0"""

NAME = "LTE_MODEM_ESN"
BASE_ID = 42
RANGE = 1
_fields_ = [
]
vla_field = ("esn", structs.kv_string)
_pack_ = 1

class lte_modem_imei(VLACompatLittleEndianStruct):
"""'International Modem Equiment Identifier' as returned by AT+CGSN=1"""

NAME = "LTE_MODEM_IMEI"
BASE_ID = 43
RANGE = 1
_fields_ = [
("imei", ctypes.c_uint64),
]
_pack_ = 1

class lte_sim_uicc(VLACompatLittleEndianStruct):
"""SIM Universal Identifier (https://www.itu.int/en/ITU-T/inr/forms/Pages/iin.aspx)"""

NAME = "LTE_SIM_UICC"
BASE_ID = 44
RANGE = 1
_fields_ = [
]
vla_field = ("uicc", structs.kv_string)
_pack_ = 1

class lte_pdp_config(VLACompatLittleEndianStruct):
"""Packet Data Protocol (PDP) default context configuration"""

NAME = "LTE_PDP_CONFIG"
BASE_ID = 45
RANGE = 1
_fields_ = [
]
vla_field = ("apn", structs.kv_string)
_pack_ = 1

class bluetooth_peer(VLACompatLittleEndianStruct):
"""Bluetooth peer device"""

NAME = "BLUETOOTH_PEER"
BASE_ID = 50
RANGE = 1
_fields_ = [
("address", structs.bt_addr_le),
]
_pack_ = 1

class geofence(VLACompatLittleEndianStruct):
"""Array of points defining a closed polygon"""

NAME = "GEOFENCE"
BASE_ID = 100
RANGE = 16
_fields_ = [
]
vla_field = ("points", 0 * structs.gcs_location)
_pack_ = 1

class secure_storage_reserved(VLACompatLittleEndianStruct):
"""Keys reserved for secure storage (do not enable)"""

NAME = "SECURE_STORAGE_RESERVED"
BASE_ID = 30000
RANGE = 10
_fields_ = [
]
vla_field = ("data", 0 * ctypes.c_uint8)
_pack_ = 1

ID_MAPPING = {
0: reboots,
1: bluetooth_addr,
2: exfat_disk_info,
3: bluetooth_ctlr_version,
10: fixed_location,
20: wifi_ssid,
21: wifi_psk,
30: ntp_server_url,
31: epacket_udp_url,
32: epacket_udp_port,
40: lte_modem_model,
41: lte_modem_firmware_revision,
42: lte_modem_esn,
43: lte_modem_imei,
44: lte_sim_uicc,
45: lte_pdp_config,
50: bluetooth_peer,
100: geofence,
101: geofence,
102: geofence,
103: geofence,
104: geofence,
105: geofence,
106: geofence,
107: geofence,
108: geofence,
109: geofence,
110: geofence,
111: geofence,
112: geofence,
113: geofence,
114: geofence,
115: geofence,
30000: secure_storage_reserved,
30001: secure_storage_reserved,
30002: secure_storage_reserved,
30003: secure_storage_reserved,
30004: secure_storage_reserved,
30005: secure_storage_reserved,
30006: secure_storage_reserved,
30007: secure_storage_reserved,
30008: secure_storage_reserved,
30009: secure_storage_reserved,
}
8 changes: 3 additions & 5 deletions src/infuse_iot/rpc_wrappers/kv_bt_peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import infuse_iot.generated.rpc_definitions as defs
from infuse_iot.commands import InfuseRpcCommand
from infuse_iot.util.argparse import BtLeAddress
from infuse_iot.util.ctypes import bytes_to_uint8
from infuse_iot.util.ctypes import VLACompatLittleEndianStruct, bytes_to_uint8


class kv_bt_peer(InfuseRpcCommand, defs.kv_write):
Expand All @@ -19,10 +19,8 @@ class request(ctypes.LittleEndianStructure):
]
_pack_ = 1

class response(InfuseRpcCommand.VariableSizeResponse):
base_fields = []
var_name = "rc"
var_type = ctypes.c_int16
class response(VLACompatLittleEndianStruct):
vla_field = ("rc", 0 * ctypes.c_int16)

@staticmethod
def kv_store_value_factory(id, value_bytes):
Expand Down
27 changes: 22 additions & 5 deletions src/infuse_iot/rpc_wrappers/kv_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import ctypes
import errno
import os
from typing import Any

from tabulate import tabulate

import infuse_iot.generated.kv_definitions as kv
import infuse_iot.generated.rpc_definitions as defs
from infuse_iot.commands import InfuseRpcCommand

Expand Down Expand Up @@ -69,12 +73,25 @@ def handle_response(self, return_code, response):
for r in response:
if r.len > 0:
b = bytes(r.data)
kv_type = kv.slots.ID_MAPPING[r.id]
kv_val = kv_type.vla_from_buffer_copy(b)

print(f"Key: {r.id} ({r.len} bytes):")
print(f"Key: {kv_type.NAME} ({r.len} bytes):")
print(f"\tHex: {b.hex()}")
try:
print(f"\tStr: {b.decode('utf-8')}")
except UnicodeDecodeError:
pass

fields = []
for field_name, field_val in kv_val.iter_fields():
fmt_val: Any
if isinstance(field_val, ctypes.Array):
if field_val._type_ == ctypes.c_char:
fmt_val = bytes(field_val).decode("utf-8")
elif field_val._type_ == ctypes.c_ubyte:
fmt_val = bytes(field_val).hex()
else:
fmt_val = list(field_val)
else:
fmt_val = field_val
fields.append((field_name, fmt_val))
print(tabulate(fields))
else:
print(f"Key: {r.id} (Failed to read '{errno.errorcode[-r.len]}')")
Loading
Loading