Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/infuse_iot/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import argparse
import ctypes

from infuse_iot.epacket import Auth
from infuse_iot.epacket.packet import Auth


class InfuseCommand:
Expand Down
21 changes: 21 additions & 0 deletions src/infuse_iot/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python3

import enum


class InfuseType(enum.Enum):
"""Infuse Data Types"""

ECHO_REQ = 0
ECHO_RSP = 1
TDF = 2
RPC_CMD = 3
RPC_DATA = 4
RPC_DATA_ACK = 5
RPC_RSP = 6
RECEIVED_EPACKET = 7
ACK = 8
SERIAL_LOG = 10
MEMFAULT_CHUNK = 30

KEY_IDS = 127
45 changes: 42 additions & 3 deletions src/infuse_iot/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from infuse_iot.api_client.api.default import get_shared_secret
from infuse_iot.api_client.models import Key
from infuse_iot.util.crypto import hkdf_derive
from infuse_iot.epacket.interface import Address as InterfaceAddress
from infuse_iot.credentials import get_api_key, load_network


Expand Down Expand Up @@ -52,13 +53,20 @@ def __init__(self, address, network_id=None, device_id=None):
def __init__(self):
self.gateway = None
self.devices: Dict[int, DeviceDatabase.DeviceState] = {}

def observe_serial(
self, address: int, network_id: int | None = None, device_id: int | None = None
self.bt_addr: Dict[InterfaceAddress.BluetoothLeAddr, int] = {}

def observe_device(
self,
address: int,
network_id: int | None = None,
device_id: int | None = None,
bt_addr: InterfaceAddress.BluetoothLeAddr | None = None,
):
"""Update device state based on observed packet"""
if self.gateway is None:
self.gateway = address
if bt_addr is not None:
self.bt_addr[bt_addr] = address
if address not in self.devices:
self.devices[address] = self.DeviceState(address)
if network_id is not None:
Expand Down Expand Up @@ -118,6 +126,9 @@ def _serial_key(self, base, time_idx):
def _bt_adv_key(self, base, time_idx):
return hkdf_derive(base, time_idx.to_bytes(4, "little"), b"bt_adv")

def _bt_gatt_key(self, base, time_idx):
return hkdf_derive(base, time_idx.to_bytes(4, "little"), b"bt_gatt")

def has_public_key(self, address: int):
"""Does the database have the public key for this device?"""
if address not in self.devices:
Expand All @@ -130,6 +141,12 @@ def has_network_id(self, address: int):
return False
return self.devices[address].network_id is not None

def infuse_id_from_bluetooth(
self, bt_addr: InterfaceAddress.BluetoothLeAddr
) -> int | None:
"""Get Bluetooth address associated with device"""
return self.bt_addr.get(bt_addr, None)

def serial_network_key(self, address: int, gps_time: int):
"""Network key for serial interface"""
if address not in self.devices:
Expand Down Expand Up @@ -173,3 +190,25 @@ def bt_adv_device_key(self, address: int, gps_time: int):
time_idx = gps_time // (60 * 60 * 24)

return self._bt_adv_key(base, time_idx)

def bt_gatt_network_key(self, address: int, gps_time: int):
"""Network key for Bluetooth advertising interface"""
if address not in self.devices:
raise DeviceUnknownNetworkKey
network_id = self.devices[address].network_id

return self._network_key(network_id, b"bt_gatt", gps_time)

def bt_gatt_device_key(self, address: int, gps_time: int):
"""Device key for Bluetooth advertising interface"""
if address not in self.devices:
raise DeviceUnknownDeviceKey
d = self.devices[address]
if d.device_id is None:
raise DeviceUnknownDeviceKey
base = self.devices[address].shared_key
if base is None:
raise DeviceUnknownDeviceKey
time_idx = gps_time // (60 * 60 * 24)

return self._bt_gatt_key(base, time_idx)
1 change: 1 addition & 0 deletions src/infuse_iot/epacket/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#!/usr/bin/env python3
15 changes: 15 additions & 0 deletions src/infuse_iot/epacket/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env python3

from typing import Dict
from typing_extensions import Self


class Serializable:
def to_json(self) -> Dict:
"""Convert class to json dictionary"""
raise NotImplementedError

@classmethod
def from_json(cls, values: Dict) -> Self:
"""Reconstruct class from json dictionary"""
raise NotImplementedError
117 changes: 117 additions & 0 deletions src/infuse_iot/epacket/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python3

import ctypes
import enum

from typing import Dict
from typing_extensions import Self

from infuse_iot.util.ctypes import bytes_to_uint8
from infuse_iot.epacket.common import Serializable
import infuse_iot.generated.rpc_definitions as rpc_defs


class ID(enum.Enum):
"""Interface identifier"""

SERIAL = 0
UDP = 1
BT_ADV = 2
BT_PERIPHERAL = 3
BT_CENTRAL = 4


class Address(Serializable):
class SerialAddr(Serializable):
def __str__(self):
return ""

def len(self):
return 0

def to_json(self) -> Dict:
return {"i": "SERIAL"}

@classmethod
def from_json(cls, values: Dict) -> Self:
return cls()

class BluetoothLeAddr(Serializable):
class CtypesFormat(ctypes.LittleEndianStructure):
_fields_ = [
("type", ctypes.c_uint8),
("addr", 6 * ctypes.c_uint8),
]
_pack_ = 1

def __init__(self, addr_type: int, addr_val: int):
self.addr_type = addr_type
self.addr_val = addr_val

def __hash__(self) -> int:
return (self.addr_type << 48) + self.addr_val

def __eq__(self, another) -> bool:
return (
self.addr_type == another.addr_type
and self.addr_val == another.addr_val
)

def __str__(self) -> str:
t = "random" if self.addr_type == 1 else "public"
v = ":".join([f"{x:02x}" for x in self.addr_val.to_bytes(6, "big")])
return f"{v} ({t})"

def len(self):
return ctypes.sizeof(self.CtypesFormat)

def to_json(self) -> Dict:
return {"i": "BT", "t": self.addr_type, "v": self.addr_val}

@classmethod
def from_json(cls, values: Dict) -> Self:
return cls(values["t"], values["v"])

def to_rpc_struct(self) -> rpc_defs.rpc_struct_bt_addr_le:
"""Convert the address to the common RPC address structure"""

return rpc_defs.rpc_struct_bt_addr_le(
self.addr_type, bytes_to_uint8(self.addr_val.to_bytes(6, "little"))
)

@classmethod
def from_rpc_struct(cls, struct: rpc_defs.rpc_struct_bt_addr_le):
"""Create instance from the common RPC address structure"""

return cls(struct.type, int.from_bytes(struct.val, "little"))

def __init__(self, val):
self.val = val

def __str__(self):
return str(self.val)

def len(self):
return self.val.len()

def to_json(self) -> Dict:
return self.val.to_json()

@classmethod
def from_json(cls, values: Dict) -> Self:
if values["i"] == "BT":
return cls(cls.BluetoothLeAddr.from_json(values))
elif values["i"] == "SERIAL":
return cls(cls.SerialAddr())
raise NotImplementedError("Unknown address type")

@classmethod
def from_bytes(cls, interface: ID, stream: bytes) -> Self:
assert interface in [
ID.BT_ADV,
ID.BT_PERIPHERAL,
ID.BT_CENTRAL,
]

c = cls.BluetoothLeAddr.CtypesFormat.from_buffer_copy(stream)
return cls.BluetoothLeAddr(c.type, int.from_bytes(bytes(c.addr), "little"))
Loading