Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/infuse_iot/generated/kv_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class kv_string(VLACompatLittleEndianStruct):
vla_field = ("value", 0 * ctypes.c_char)
_pack_ = 1

def __str__(self) -> str:
return bytes(self.value).decode('utf-8')

class kv_mcuboot_img_sem_ver(VLACompatLittleEndianStruct):
"""MCUboot semantic versioning struct"""

Expand Down Expand Up @@ -224,10 +227,23 @@ class lte_pdp_config(VLACompatLittleEndianStruct):
BASE_ID = 45
RANGE = 1
_fields_ = [
("family", ctypes.c_uint8),
]
vla_field = ("apn", structs.kv_string)
_pack_ = 1

class lte_networking_modes(VLACompatLittleEndianStruct):
"""Enabled LTE networking modes and preferences"""

NAME = "LTE_NETWORKING_MODES"
BASE_ID = 46
RANGE = 1
_fields_ = [
("modes", ctypes.c_uint8),
("prefer", ctypes.c_uint8),
]
_pack_ = 1

class bluetooth_peer(VLACompatLittleEndianStruct):
"""Bluetooth peer device"""

Expand Down Expand Up @@ -259,6 +275,7 @@ class geofence(VLACompatLittleEndianStruct):
BASE_ID = 100
RANGE = 16
_fields_ = [
("points_num", ctypes.c_uint8),
]
vla_field = ("points", 0 * structs.gcs_location)
_pack_ = 1
Expand All @@ -270,6 +287,7 @@ class secure_storage_reserved(VLACompatLittleEndianStruct):
BASE_ID = 30000
RANGE = 10
_fields_ = [
("data_num", ctypes.c_uint8),
]
vla_field = ("data", 0 * ctypes.c_uint8)
_pack_ = 1
Expand All @@ -291,6 +309,7 @@ class secure_storage_reserved(VLACompatLittleEndianStruct):
43: lte_modem_imei,
44: lte_sim_uicc,
45: lte_pdp_config,
46: lte_networking_modes,
50: bluetooth_peer,
60: gravity_reference,
100: geofence,
Expand Down
54 changes: 35 additions & 19 deletions src/infuse_iot/rpc_wrappers/lte_modem_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
# import ctypes
import argparse

from infuse_iot.generated.kv_definitions import slots as kv_slots
from infuse_iot.generated.kv_definitions import structs as kv_structs
from infuse_iot.zephyr import lte as z_lte
from infuse_iot.zephyr.errno import errno

from . import kv_read, lte_pdp_ctx
Expand All @@ -23,29 +26,42 @@ def add_parser(cls, parser):
return

def __init__(self, _args):
super().__init__(argparse.Namespace(keys=[40, 41, 42, 43, 44, 45]))
super().__init__(argparse.Namespace(keys=[40, 41, 42, 43, 44, 45, 46]))

def handle_response(self, return_code, response):
if return_code != 0:
print(f"Failed to query modem info ({errno.strerror(-return_code)})")
return

unknown = b"_unknown"
modem_model = bytes(response[0].data) if response[0].len > 0 else unknown
modem_firmware = bytes(response[1].data) if response[1].len > 0 else unknown
modem_esn = bytes(response[2].data) if response[2].len > 0 else unknown
modem_imei = bytes(response[3].data) if response[3].len > 0 else unknown
sim_uicc = bytes(response[4].data) if response[4].len > 0 else unknown
if response[5].len > 0:
family = lte_pdp_ctx.lte_pdp_ctx.PDPFamily(response[5].data[0])
apn = bytes(response[5].data[2:]).decode("utf-8")
pdp_cfg = f'"{apn}" ({family.name})'
def struct_decode(t, r):
if r.len <= 0:
return None
return t.vla_from_buffer_copy(bytes(r.data))

def str_decode(r):
if r.len <= 0:
return "Unknown"
return str(kv_structs.kv_string.vla_from_buffer_copy(bytes(r.data)))

modem_imei = struct_decode(kv_slots.lte_modem_imei, response[3])
pdp_ctx = struct_decode(kv_slots.lte_pdp_config, response[5])
system_modes = struct_decode(kv_slots.lte_networking_modes, response[6])

if pdp_ctx:
pdp_str = f'"{str(pdp_ctx.apn)}" ({lte_pdp_ctx.lte_pdp_ctx.PDPFamily(pdp_ctx.family).name})'
else:
pdp_cfg = "default"

print(f"\t Model: {modem_model[1:].decode('utf-8')}")
print(f"\tFirmware: {modem_firmware[1:].decode('utf-8')}")
print(f"\t ESN: {modem_esn[1:].decode('utf-8')}")
print(f"\t IMEI: {int.from_bytes(modem_imei, 'little')}")
print(f"\t SIM: {sim_uicc[1:].decode('utf-8')}")
print(f"\t APN: {pdp_cfg}")
pdp_str = "unknown"
if system_modes:
system_mode = z_lte.LteSystemMode(system_modes.modes)
system_pref = z_lte.LteSystemPreference(system_modes.prefer)
modes_str = f"{system_mode} (Prefer: {system_pref})"
else:
modes_str = "default"

print(f"\t Model: {str_decode(response[0])}")
print(f"\tFirmware: {str_decode(response[1])}")
print(f"\t ESN: {str_decode(response[2])}")
print(f"\t IMEI: {modem_imei.imei}")
print(f"\t SIM: {str_decode(response[4])}")
print(f"\t APN: {pdp_str}")
print(f"\t Mode: {modes_str}")
9 changes: 7 additions & 2 deletions src/infuse_iot/tools/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def __init__(self, common: CommonThreadState, log: io.TextIOWrapper):
self._reconstructor.send(None)
self._line = ""
self._log = log
self._next_ping = 0.0
super().__init__(self._iter)

def _iter(self):
Expand Down Expand Up @@ -196,8 +197,12 @@ def _handle_serial_frame(self, frame: bytearray):
assert self._common.ddb.gateway is not None
if not self._common.ddb.has_network_id(self._common.ddb.gateway):
# Need to know network ID before we can query the device key
self._common.port.ping()
Console.log_info(f"Dropping {len(frame)} byte packet to query network ID...")
if time.time() >= self._next_ping:
self._next_ping = time.time() + 1.1
self._common.port.ping()
Console.log_info(f"Dropping {len(frame)} byte packet to query network ID...")
else:
Console.log_info(f"Dropping {len(frame)} byte packet...")
else:
self._common.query_device_key(None)
Console.log_info(f"Dropping {len(frame)} byte packet to query device key...")
Expand Down
9 changes: 5 additions & 4 deletions src/infuse_iot/tools/native_bt.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self, database: DeviceDatabase, server: LocalServer, bleak_mapping:
self._server = server
self._mapping = bleak_mapping
self._queues: dict[int, asyncio.Queue] = {}
self._tasks: dict[int, asyncio.Task] = {}

def notification_handler(self, _characteristic: BleakGATTCharacteristic, data: bytearray):
hdr, decr = CtypeBtGattFrame.decrypt(self._db, None, bytes(data))
Expand Down Expand Up @@ -140,6 +141,7 @@ async def create_connection(self, request: GatewayRequestConnectionRequest, dev:

# Queue no longer being handled
self._queues.pop(request.infuse_id)
self._tasks.pop(request.infuse_id)
Console.log_info(f"{dev}: Terminating connection")

def datagram_received(self, data: bytes, addr: tuple[str | Any, int]):
Expand Down Expand Up @@ -168,7 +170,7 @@ def datagram_received(self, data: bytes, addr: tuple[str | Any, int]):
q = asyncio.Queue()
self._queues[request.infuse_id] = q
# Create task to handle the connection
loop.create_task(self.create_connection(request, ble_dev, q))
self._tasks[request.infuse_id] = loop.create_task(self.create_connection(request, ble_dev, q))

def error_received(self, exc):
Console.log_error(f"Error received: {exc}")
Expand Down Expand Up @@ -234,15 +236,14 @@ def simple_callback(self, device: BLEDevice, data: AdvertisementData):

async def async_bt_receiver(self):
loop = asyncio.get_event_loop()
loop.create_task(self.server_handler())
handler = loop.create_task(self.server_handler())

scanner = BleakScanner(self.simple_callback, [str(InfuseBluetoothUUID.SERVICE_UUID)], cb=dict(use_bdaddr=True))

while True:
Console.log_info("Starting scanner")
async with scanner:
# Run the scanner forever
await asyncio.Future()
await handler

def sync_request_handler(self):
# Loop while there are packets to send
Expand Down
42 changes: 42 additions & 0 deletions src/infuse_iot/zephyr/lte.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,48 @@ def __str__(self):
return pretty_names[self.value]


class LteSystemMode(enum.IntEnum):
LTE_M = 1
NB_IOT = 2
GNSS = 3
LTE_M_GNSS = 4
NB_IOT_GNSS = 5
LTE_M_NB_IOT = 6
LTE_M_NB_IOT_GNSS = 7
DEFAULT = 0xFF

def __str__(self):
pretty_names = {
self.LTE_M: "LTE-M",
self.NB_IOT: "NB-IoT",
self.GNSS: "GNSS",
self.LTE_M_GNSS: "LTE-M + GNSS",
self.NB_IOT_GNSS: "NB-IoT + GNSS",
self.LTE_M_NB_IOT: "LTE-M + NB-IoT",
self.LTE_M_NB_IOT_GNSS: "LTE-M + NB-IoT + GNSS",
self.DEFAULT: "Modem default",
}
return pretty_names[self.value]


class LteSystemPreference(enum.IntEnum):
AUTO = 0
LTE_M = 1
NB_IOT = 2
PLMN_LTE_M = 3
PLMN_NB_IOT = 4

def __str__(self):
pretty_names = {
self.AUTO: "No preference",
self.LTE_M: "LTE-M",
self.NB_IOT: "NB-IoT",
self.PLMN_LTE_M: "PLMN > LTE-M",
self.PLMN_NB_IOT: "PLMN > NB-IoT",
}
return pretty_names[self.value]


class LteBand:
def __init__(self, band, freq_dl_low, offset_dl, freq_ul_low, offset_ul):
self.band = band
Expand Down