Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/infuse_iot/socket_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def receive(self) -> GatewayRequest | None:

def close(self):
self._input_sock.close()
self._output_addr.close()
self._output_sock.close()


class LocalClient:
Expand Down
164 changes: 62 additions & 102 deletions src/infuse_iot/tools/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
except NotImplementedError:
TerminalMenu = None

from pynrfjprog import LowLevel, Parameters

from infuse_iot.api_client import Client
from infuse_iot.api_client.api.board import get_board_by_id, get_boards
from infuse_iot.api_client.api.device import (
Expand All @@ -26,6 +24,7 @@
from infuse_iot.api_client.models import Board, Error, NewDevice, NewDeviceMetadata
from infuse_iot.commands import InfuseCommand
from infuse_iot.credentials import get_api_key
from infuse_iot.util.soc import nrf, soc, stm


class ProvisioningStruct(ctypes.LittleEndianStructure):
Expand All @@ -42,6 +41,13 @@ class SubCommand(InfuseCommand):

@classmethod
def add_parser(cls, parser):
vendor_group = parser.add_mutually_exclusive_group(required=True)
vendor_group.add_argument(
"--nrf", dest="vendor", action="store_const", const="nrf", help="Nordic Semiconductor SoC"
)
vendor_group.add_argument(
"--stm", dest="vendor", action="store_const", const="stm", help="ST Microelectronics SoC"
)
parser.add_argument(
"--snr",
type=int,
Expand All @@ -66,6 +72,7 @@ def add_parser(cls, parser):
)

def __init__(self, args):
self._vendor = args.vendor
self._snr = args.snr
self._board = args.board
self._org = args.organisation
Expand All @@ -76,55 +83,7 @@ def __init__(self, args):
key, val = meta.strip().split("=", 1)
self._metadata[key.strip()] = val

def nrf_device_info(self, api: LowLevel.API) -> tuple[str, int, int]:
"""Retrive device ID and customer UICR address"""
device_id_offsets = {
# nRF52840 only
"NRF52": 0x60,
"NRF53": 0x204,
"NRF91": 0x204,
}
customer_offsets = {
# nRF52840 only
"NRF52": 0x80,
"NRF53": 0x100,
"NRF91": 0x108,
}
family = api.read_device_family()
device_info = api.read_device_info()

if device_info[1] == Parameters.DeviceName.NRF9120:
# Use version to determine nRF9151 vs nRF9161
if device_info[0] == Parameters.DeviceVersion.NRF9120_xxAA_REV3:
soc = "nRF9151"
else:
sys.exit(f"Unknown device: {device_info[0]}")
else:
socs = {
Parameters.DeviceName.NRF52840: "nRF52840",
Parameters.DeviceName.NRF5340: "nRF5340",
Parameters.DeviceName.NRF9160: "nRF9160",
}
if device_info[1] not in socs:
sys.exit(f"Unknown device: {device_info[1]}")
soc = socs[device_info[1]]

uicr_addr = None
dev_id = None
for desc in api.read_memory_descriptors():
if desc.type == Parameters.MemoryType.UICR:
uicr_addr = desc.start + customer_offsets[family]
if desc.type == Parameters.MemoryType.FICR:
dev_id_addr = desc.start + device_id_offsets[family]
dev_id_bytes = bytes(api.read(dev_id_addr, 8))
dev_id = int.from_bytes(dev_id_bytes, "big")

assert uicr_addr is not None
assert dev_id is not None

return soc, uicr_addr, dev_id

def create_device(self, client, soc, hardware_id_str):
def create_device(self, client: Client, soc_name: str, hardware_id_str: str):
if self._org is None:
orgs = get_all_organisations.sync(client=client)
if isinstance(orgs, Error) or orgs is None:
Expand Down Expand Up @@ -153,8 +112,8 @@ def create_device(self, client, soc, hardware_id_str):
board = get_board_by_id.sync(client=client, id=self._board)
if not isinstance(board, Board):
sys.exit(f"Board query failed {board}")
if board.soc != soc:
sys.exit(f"Found SoC '{soc}' but board '{board.name}' has SoC '{board.soc}'")
if board.soc != soc_name:
sys.exit(f"Found SoC '{soc_name}' but board '{board.name}' has SoC '{board.soc}'")

new_board = NewDevice(
mcu_id=hardware_id_str,
Expand All @@ -170,60 +129,61 @@ def create_device(self, client, soc, hardware_id_str):
sys.exit(f"Failed to create device:\n\t<{response.status_code}> {response.content.decode('utf-8')}")

def run(self):
with LowLevel.API() as api:
if self._snr is None:
api.connect_to_emu_without_snr()
else:
api.connect_to_emu_with_snr(self._snr)
interface: soc.ProvisioningInterface
if self._vendor == "nrf":
interface = nrf.Interface(self._snr)
if self._vendor == "stm":
interface = stm.Interface()
else:
raise NotImplementedError(f"Unhandled vendor '{self._vendor}'")

soc, uicr_addr, hardware_id = self.nrf_device_info(api)
hardware_id_str = f"{hardware_id:016x}"
hardware_id = interface.unique_device_id()
hardware_id_str = f"{hardware_id:0{2*interface.unique_device_id_len}x}"

client = Client(base_url="https://api.infuse-iot.com").with_headers(
{"x-api-key": f"Bearer {get_api_key()}"}
)
client = Client(base_url="https://api.infuse-iot.com").with_headers({"x-api-key": f"Bearer {get_api_key()}"})

# Get existing device or create new device
with client as client:
response = get_device_by_soc_and_mcu_id.sync_detailed(client=client, soc=soc, mcu_id=hardware_id_str)
if response.status_code == HTTPStatus.OK:
# Device found, fall through
pass
elif response.status_code == HTTPStatus.NOT_FOUND:
# Create new device here
self.create_device(client, soc, hardware_id_str)
# Query information back out
response = get_device_by_soc_and_mcu_id.sync_detailed(
client=client, soc=soc, mcu_id=hardware_id_str
)
if response.status_code != HTTPStatus.OK:
err = "Failed to query device after creation:\n"
err += f"\t<{response.status_code}> {response.content.decode('utf-8')}"
sys.exit(err)
print("To provision more devices like this:")
print(f"\t infuse provision --organisation {self._org} --board {self._board}")
else:
err = "Failed to query device information:\n"
# Get existing device or create new device
with client as client:
response = get_device_by_soc_and_mcu_id.sync_detailed(
client=client, soc=interface.soc_name, mcu_id=hardware_id_str
)
if response.status_code == HTTPStatus.OK:
# Device found, fall through
pass
elif response.status_code == HTTPStatus.NOT_FOUND:
# Create new device here
self.create_device(client, interface.soc_name, hardware_id_str)
# Query information back out
response = get_device_by_soc_and_mcu_id.sync_detailed(
client=client, soc=interface.soc_name, mcu_id=hardware_id_str
)
if response.status_code != HTTPStatus.OK:
err = "Failed to query device after creation:\n"
err += f"\t<{response.status_code}> {response.content.decode('utf-8')}"
sys.exit(err)

assert response.parsed is not None
assert isinstance(response.parsed.device_id, str)
# Compare current flash contents to desired flash contents
cloud_id = int(response.parsed.device_id, 16)
current_bytes = bytes(api.read(uicr_addr, ctypes.sizeof(ProvisioningStruct)))
desired = ProvisioningStruct(cloud_id)
desired_bytes = bytes(desired)

if current_bytes == desired_bytes:
print(f"HW ID 0x{hardware_id:016x} already provisioned as 0x{desired.device_id:016x}")
print("To provision more devices like this:")
print(f"\t infuse provision --organisation {self._org} --board {self._board}")
else:
if current_bytes != len(current_bytes) * b"\xff":
print(f"HW ID 0x{hardware_id:016x} already has incorrect provisioning info, recover device")
return
err = "Failed to query device information:\n"
err += f"\t<{response.status_code}> {response.content.decode('utf-8')}"
sys.exit(err)

assert response.parsed is not None
assert isinstance(response.parsed.device_id, str)
# Compare current flash contents to desired flash contents
cloud_id = int(response.parsed.device_id, 16)
current_bytes = interface.read_provisioned_data(ctypes.sizeof(ProvisioningStruct))
desired = ProvisioningStruct(cloud_id)
desired_bytes = bytes(desired)

if current_bytes == desired_bytes:
print(f"HW ID 0x{hardware_id:016x} already provisioned as 0x{desired.device_id:016x}")
else:
if current_bytes != len(current_bytes) * b"\xff":
print(f"HW ID 0x{hardware_id:016x} already has incorrect provisioning info, recover device")
return

api.write(uicr_addr, bytes(desired), True)
print(f"HW ID 0x{hardware_id:016x} now provisioned as 0x{desired.device_id:016x}")
interface.write_provisioning_data(bytes(desired))
print(f"HW ID 0x{hardware_id:016x} now provisioned as 0x{desired.device_id:016x}")

# Reset the MCU
api.pin_reset()
interface.close()
1 change: 1 addition & 0 deletions src/infuse_iot/util/soc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#!/usr/bin/env python3
114 changes: 114 additions & 0 deletions src/infuse_iot/util/soc/nrf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright (c) 2020 Teslabs Engineering S.L.
# Copyright (c) 2025 Embeint Inc
#
# SPDX-License-Identifier: Apache-2.0


from pynrfjprog import LowLevel, Parameters

from infuse_iot.util.soc.soc import ProvisioningInterface


class NRFFamily:
DEVICE_ID_OFFSET: int
CUSTOMER_OFFSET: int

@staticmethod
def soc(device_info):
raise NotImplementedError


class nRF52840(NRFFamily):
DEVICE_ID_OFFSET = 0x60
CUSTOMER_OFFSET = 0x80

@staticmethod
def soc(device_info):
assert device_info[1] == Parameters.DeviceName.NRF52840
return "nRF52840"


class nRF5340(NRFFamily):
DEVICE_ID_OFFSET = 0x204
CUSTOMER_OFFSET = 0x100

@staticmethod
def soc(device_info):
assert device_info[1] == Parameters.DeviceName.NRF5340
return "nRF5340"


class nRF91(NRFFamily):
DEVICE_ID_OFFSET = 0x204
CUSTOMER_OFFSET = 0x108

@staticmethod
def soc(device_info):
if device_info[1] == Parameters.DeviceName.NRF9160:
return "nRF9160"
elif device_info[1] == Parameters.DeviceName.NRF9120:
# Use version to determine nRF9151 vs nRF9161
if device_info[0] == Parameters.DeviceVersion.NRF9120_xxAA_REV3:
return "nRF9151"
else:
raise NotImplementedError(f"Unknown device: {device_info[0]}")
else:
raise NotImplementedError(f"Unknown device {device_info[1]}")


DEVICE_FAMILY_MAPPING: dict[str, type[NRFFamily]] = {
"NRF52": nRF52840,
"NRF53": nRF5340,
"NRF91": nRF91,
}


class Interface(ProvisioningInterface):
def __init__(self, snr: int | None):
self._api = LowLevel.API()
self._api.open()
if snr is None:
self._api.connect_to_emu_without_snr()
else:
self._api.connect_to_emu_with_snr(snr)
self._family = DEVICE_FAMILY_MAPPING[self._api.read_device_family()]
self._device_info = self._api.read_device_info()
self._soc_name = self._family.soc(self._device_info)

def close(self):
self._api.pin_reset()
self._api.disconnect_from_emu()
self._api.close()

def _find_region(self, memory_type: Parameters.MemoryType):
for desc in self._api.read_memory_descriptors():
if desc.type == memory_type:
return desc
raise RuntimeError(f"Could not find memory region {memory_type}")

@property
def soc_name(self) -> str:
return self._soc_name

@property
def unique_device_id_len(self) -> int:
return 8

def unique_device_id(self) -> int:
ficr = self._find_region(Parameters.MemoryType.FICR)
dev_id_addr = ficr.start + self._family.DEVICE_ID_OFFSET

dev_id_bytes = bytes(self._api.read(dev_id_addr, 8))
return int.from_bytes(dev_id_bytes, "big")

def read_provisioned_data(self, num: int) -> bytes:
uicr = self._find_region(Parameters.MemoryType.UICR)
customer_addr = uicr.start + self._family.CUSTOMER_OFFSET

return bytes(self._api.read(customer_addr, num))

def write_provisioning_data(self, data: bytes):
uicr = self._find_region(Parameters.MemoryType.UICR)
customer_addr = uicr.start + self._family.CUSTOMER_OFFSET

self._api.write(customer_addr, data, True)
33 changes: 33 additions & 0 deletions src/infuse_iot/util/soc/soc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env

from abc import ABCMeta, abstractmethod


class ProvisioningInterface(metaclass=ABCMeta):
"Generic SoC provisioning interface"

@property
@abstractmethod
def soc_name(self) -> str:
"Infuse-IoT SoC name"

@property
@abstractmethod
def unique_device_id_len(self) -> int:
"Length of the unique device ID in bytes"

@abstractmethod
def unique_device_id(self) -> int:
"""Read unique device ID"""

@abstractmethod
def read_provisioned_data(self, num: int) -> bytes:
"""Read currently provisioned data from device"""

@abstractmethod
def write_provisioning_data(self, data: bytes):
"""Write provisioning data to device"""

@abstractmethod
def close(self) -> None:
"Close any interfaces opened"
Loading
Loading