Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[mypy]

[mypy-colorama]
ignore_missing_imports = True

[mypy-serial]
ignore_missing_imports = True

[mypy-pylink]
ignore_missing_imports = True

[mypy-pynrfjprog]
ignore_missing_imports = True

[mypy-simple_term_menu]
ignore_missing_imports = True

[mypy-dateutil.*]
ignore_missing_imports = True

[mypy-plotly.*]
ignore_missing_imports = True

[mypy-dash.*]
ignore_missing_imports = True
13 changes: 10 additions & 3 deletions src/infuse_iot/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@
import argparse
import ctypes

from typing import List, Type, Tuple


from infuse_iot.epacket.packet import Auth


class InfuseCommand:
"""Infuse-IoT SDK meta-tool command parent class"""

NAME = "N/A"
HELP = "N/A"
DESCRIPTION = "N/A"

@classmethod
def add_parser(cls, parser: argparse.ArgumentParser):
"""Add arguments for sub-command"""

def __init__(self, **kwargs):
def __init__(self, args: argparse.Namespace):
pass

def run(self):
Expand Down Expand Up @@ -56,9 +63,9 @@ def handle_response(self, return_code, response):
raise NotImplementedError

class VariableSizeResponse:
base_fields = []
base_fields: List[Tuple[str, Type[ctypes._SimpleCData]]] = []
var_name = "x"
var_type = ctypes.c_ubyte
var_type: Type[ctypes._SimpleCData] = ctypes.c_ubyte

@classmethod
def from_buffer_copy(cls, source, offset=0):
Expand Down
58 changes: 35 additions & 23 deletions src/infuse_iot/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import binascii
import base64
from typing import Dict
from typing import Dict, Tuple

from infuse_iot.api_client import Client
from infuse_iot.api_client.api.default import get_shared_secret
Expand Down Expand Up @@ -38,27 +38,32 @@ class DeviceDatabase:
_network_keys = {
0x000000: b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f",
}
_derived_keys = {}
_derived_keys: Dict[Tuple[int, bytes, int], bytes] = {}

class DeviceState:
"""Device State"""

def __init__(self, address, network_id=None, device_id=None):
def __init__(
self,
address: int,
network_id: int | None = None,
device_id: int | None = None,
):
self.address = address
self.network_id = network_id
self.device_id = device_id
self.bt_addr: InterfaceAddress.BluetoothLeAddr | None = None
self.public_key = None
self.shared_key = None
self.public_key: bytes | None = None
self.shared_key: bytes | None = None
self._tx_gatt_seq = 0

def gatt_sequence_num(self):
"""Persistent auto-incrementing sequence number for GATT"""
self._tx_gatt_seq += 1
return self._tx_gatt_seq

def __init__(self):
self.gateway = None
def __init__(self) -> None:
self.gateway: int | None = None
self.devices: Dict[int, DeviceDatabase.DeviceState] = {}
self.bt_addr: Dict[InterfaceAddress.BluetoothLeAddr, int] = {}

Expand All @@ -68,7 +73,7 @@ def observe_device(
network_id: int | None = None,
device_id: int | None = None,
bt_addr: InterfaceAddress.BluetoothLeAddr | None = None,
):
) -> None:
"""Update device state based on observed packet"""
if self.gateway is None:
self.gateway = address
Expand All @@ -91,7 +96,7 @@ def observe_device(

def observe_security_state(
self, address: int, cloud_key: bytes, device_key: bytes, network_id: int
):
) -> None:
"""Update device state based on security_state response"""
if address not in self.devices:
self.devices[address] = self.DeviceState(address)
Expand All @@ -107,10 +112,11 @@ def observe_security_state(
with client as client:
body = Key(base64.b64encode(device_key).decode("utf-8"))
response = get_shared_secret.sync(client=client, body=body)
key = base64.b64decode(response.key)
self.devices[address].shared_key = key
if response is not None:
key = base64.b64decode(response.key)
self.devices[address].shared_key = key

def _network_key(self, network_id: int, interface: str, gps_time: int):
def _network_key(self, network_id: int, interface: bytes, gps_time: int) -> bytes:
if network_id not in self._network_keys:
try:
info = load_network(network_id)
Expand All @@ -128,22 +134,22 @@ def _network_key(self, network_id: int, interface: str, gps_time: int):

return self._derived_keys[key_id]

def _serial_key(self, base, time_idx):
def _serial_key(self, base: bytes, time_idx: int) -> bytes:
return hkdf_derive(base, time_idx.to_bytes(4, "little"), b"serial")

def _bt_adv_key(self, base, time_idx):
def _bt_adv_key(self, base: bytes, time_idx: int) -> bytes:
return hkdf_derive(base, time_idx.to_bytes(4, "little"), b"bt_adv")

def _bt_gatt_key(self, base, time_idx):
def _bt_gatt_key(self, base: bytes, time_idx: int) -> bytes:
return hkdf_derive(base, time_idx.to_bytes(4, "little"), b"bt_gatt")

def has_public_key(self, address: int):
def has_public_key(self, address: int) -> bool:
"""Does the database have the public key for this device?"""
if address not in self.devices:
return False
return self.devices[address].public_key is not None

def has_network_id(self, address: int):
def has_network_id(self, address: int) -> bool:
"""Does the database know the network ID for this device?"""
if address not in self.devices:
return False
Expand All @@ -155,15 +161,17 @@ def infuse_id_from_bluetooth(
"""Get Bluetooth address associated with device"""
return self.bt_addr.get(bt_addr, None)

def serial_network_key(self, address: int, gps_time: int):
def serial_network_key(self, address: int, gps_time: int) -> bytes:
"""Network key for serial interface"""
if address not in self.devices:
raise DeviceUnknownNetworkKey
network_id = self.devices[address].network_id
if network_id is None:
raise DeviceUnknownNetworkKey

return self._network_key(network_id, b"serial", gps_time)

def serial_device_key(self, address: int, gps_time: int):
def serial_device_key(self, address: int, gps_time: int) -> bytes:
"""Device key for serial interface"""
if address not in self.devices:
raise DeviceUnknownDeviceKey
Expand All @@ -177,15 +185,17 @@ def serial_device_key(self, address: int, gps_time: int):

return self._serial_key(base, time_idx)

def bt_adv_network_key(self, address: int, gps_time: int):
def bt_adv_network_key(self, address: int, gps_time: int) -> bytes:
"""Network key for Bluetooth advertising interface"""
if address not in self.devices:
raise DeviceUnknownNetworkKey
network_id = self.devices[address].network_id
if network_id is None:
raise DeviceUnknownNetworkKey

return self._network_key(network_id, b"bt_adv", gps_time)

def bt_adv_device_key(self, address: int, gps_time: int):
def bt_adv_device_key(self, address: int, gps_time: int) -> bytes:
"""Device key for Bluetooth advertising interface"""
if address not in self.devices:
raise DeviceUnknownDeviceKey
Expand All @@ -199,15 +209,17 @@ def bt_adv_device_key(self, address: int, gps_time: int):

return self._bt_adv_key(base, time_idx)

def bt_gatt_network_key(self, address: int, gps_time: int):
def bt_gatt_network_key(self, address: int, gps_time: int) -> bytes:
"""Network key for Bluetooth advertising interface"""
if address not in self.devices:
raise DeviceUnknownNetworkKey
network_id = self.devices[address].network_id
if network_id is None:
raise DeviceUnknownNetworkKey

return self._network_key(network_id, b"bt_gatt", gps_time)

def bt_gatt_device_key(self, address: int, gps_time: int):
def bt_gatt_device_key(self, address: int, gps_time: int) -> bytes:
"""Device key for Bluetooth advertising interface"""
if address not in self.devices:
raise DeviceUnknownDeviceKey
Expand Down
Loading