Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/infuse_iot/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import enum
import uuid


class InfuseType(enum.IntEnum):
Expand All @@ -26,3 +27,10 @@ class InfuseID(enum.IntEnum):
"""Hardcoded Infuse IDs"""

GATEWAY = -1


class InfuseBluetoothUUID:
SERVICE_UUID = uuid.UUID("0000fc74-0000-1000-8000-00805f9b34fb")
COMMAND_CHAR = uuid.UUID("dc0b71b7-fc74-fc74-aa01-8aba434a893d")
DATA_CHAR = uuid.UUID("dc0b71b7-fc74-fc74-aa02-8aba434a893d")
LOGGING_CHAR = uuid.UUID("dc0b71b7-fc74-fc74-aa03-8aba434a893d")
2 changes: 1 addition & 1 deletion src/infuse_iot/epacket/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ def encrypt(
return header_bytes + ciphertext

@classmethod
def decrypt(cls, database: DeviceDatabase, bt_addr: Address.BluetoothLeAddr, frame: bytes):
def decrypt(cls, database: DeviceDatabase, bt_addr: Address.BluetoothLeAddr | None, frame: bytes):
header = cls.from_buffer_copy(frame)
if header.flags & Flags.ENCR_DEVICE:
database.observe_device(header.device_id, device_id=header.key_metadata, bt_addr=bt_addr)
Expand Down
8 changes: 4 additions & 4 deletions src/infuse_iot/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import enum


class InfuseTimeSource(enum.IntFlag):
class InfuseTimeSource(enum.IntEnum):
NONE = 0
GNSS = 0x01
NTP = 0x02
RPC = 0x04
GNSS = 1
NTP = 2
RPC = 3
RECOVERED = 0x80

def __str__(self) -> str:
Expand Down
178 changes: 170 additions & 8 deletions src/infuse_iot/tools/native_bt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,170 @@
__author__ = "Jordan Yates"
__copyright__ = "Copyright 2024, Embeint Inc"

import argparse
import asyncio
import ctypes
import json
from typing import Any

from bleak import BleakScanner
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData

import infuse_iot.epacket.interface as interface
from infuse_iot.commands import InfuseCommand
from infuse_iot.common import InfuseBluetoothUUID, InfuseType
from infuse_iot.database import DeviceDatabase
from infuse_iot.epacket import interface
from infuse_iot.epacket.packet import (
Auth,
CtypeBtAdvFrame,
CtypeBtGattFrame,
Flags,
HopReceived,
PacketOutput,
PacketReceived,
)
from infuse_iot.socket_comms import (
ClientNotificationConnectionCreated,
ClientNotificationConnectionFailed,
ClientNotificationEpacketReceived,
GatewayRequest,
GatewayRequestConnection,
GatewayRequestConnectionRelease,
GatewayRequestConnectionRequest,
GatewayRequestEpacketSend,
LocalServer,
default_multicast_address,
)
from infuse_iot.util.argparse import BtLeAddress
from infuse_iot.util.console import Console


class InfuseGattReadResponse(ctypes.LittleEndianStructure):
"""Response to any read request on Infuse-IoT characteristics"""

_fields_ = [
("cloud_public_key", 32 * ctypes.c_uint8),
("device_public_key", 32 * ctypes.c_uint8),
("network_id", ctypes.c_uint32),
]
_pack_ = 1


class MulticastHandler(asyncio.DatagramProtocol):
def __init__(self, database: DeviceDatabase, server: LocalServer, bleak_mapping: dict[int, BLEDevice]):
self._db = database
self._server = server
self._mapping = bleak_mapping
self._queues: dict[int, asyncio.Queue] = {}

def notification_handler(self, _characteristic: BleakGATTCharacteristic, data: bytearray):
hdr, decr = CtypeBtGattFrame.decrypt(self._db, None, bytes(data))
# Correct values are annoying to get here
if_addr = interface.Address(interface.Address.BluetoothLeAddr(0, 0))
rssi = 0

bt_hop = HopReceived(
hdr.device_id,
interface.ID.BT_CENTRAL,
if_addr,
(Auth.DEVICE if hdr.flags & Flags.ENCR_DEVICE else Auth.NETWORK),
hdr.key_metadata,
hdr.gps_time,
hdr.sequence,
rssi,
)
pkt = PacketReceived(
[bt_hop],
hdr.type,
bytes(decr),
)
Console.log_rx(pkt.ptype, len(data))
self._server.broadcast(ClientNotificationEpacketReceived(pkt))

async def create_connection(self, infuse_id: int, dev: BLEDevice, queue: asyncio.Queue):
Console.log_info(f"{dev}: Initiating connection")
async with BleakClient(dev) as client:
# Modified from bleak example code
if client._backend.__class__.__name__ == "BleakClientBlueZDBus":
await client._backend._acquire_mtu() # type: ignore

security_info = await client.read_gatt_char(InfuseBluetoothUUID.COMMAND_CHAR)
resp = InfuseGattReadResponse.from_buffer_copy(security_info)
self._db.observe_security_state(
infuse_id,
bytes(resp.cloud_public_key),
bytes(resp.device_public_key),
resp.network_id,
)

await client.start_notify(InfuseBluetoothUUID.COMMAND_CHAR, self.notification_handler)
Console.log_info(f"{dev}: Connected (MTU {client.mtu_size})")

self._server.broadcast(
ClientNotificationConnectionCreated(
infuse_id,
)
)

req: GatewayRequest
while req := await queue.get():
if isinstance(req, GatewayRequestConnectionRelease):
break
assert isinstance(req, GatewayRequestEpacketSend)
pkt: PacketOutput = req.epacket

# Encrypt payload
encr = CtypeBtGattFrame.encrypt(self._db, infuse_id, pkt.ptype, pkt.auth, pkt.payload)

if pkt.ptype in [InfuseType.RPC_CMD, InfuseType.RPC_DATA]:
uuid = InfuseBluetoothUUID.COMMAND_CHAR
else:
uuid = InfuseBluetoothUUID.DATA_CHAR

Console.log_tx(pkt.ptype, len(encr))
await client.write_gatt_char(uuid, encr, response=False)

# Queue no longer being handled
self._queues.pop(infuse_id)
Console.log_info(f"{dev}: Terminating connection")

def datagram_received(self, data: bytes, addr: tuple[str | Any, int]):
loop = asyncio.get_event_loop()
request = GatewayRequest.from_json(json.loads(data.decode("utf-8")))

# If not a connection request, attempt to forward to connection context
if not isinstance(request, GatewayRequestConnectionRequest):
if isinstance(request, GatewayRequestEpacketSend):
queue_id = request.epacket.infuse_id
elif isinstance(request, GatewayRequestConnection):
queue_id = request.infuse_id
else:
raise RuntimeError
q: asyncio.Queue | None = self._queues.get(queue_id, None)
if q is not None:
loop.call_soon(lambda: q.put_nowait(request)) # type: ignore
return

ble_dev = self._mapping.get(request.infuse_id, None)
if ble_dev is None:
self._server.broadcast(ClientNotificationConnectionFailed(request.infuse_id))
return

# Create queue for further data transfer
q = asyncio.Queue()
self._queues[request.infuse_id] = q
# Create task to handle the connection
loop.create_task(self.create_connection(request.infuse_id, ble_dev, q))

def error_received(self, exc):
Console.log_error(f"Error received: {exc}")

def connection_lost(self, exc):
Console.log_error("Connection closed")


class SubCommand(InfuseCommand):
NAME = "native_bt"
HELP = "Native Bluetooth gateway"
Expand All @@ -39,18 +178,35 @@ class SubCommand(InfuseCommand):
def add_parser(cls, parser):
pass

def __init__(self, args):
def __init__(self, args: argparse.Namespace):
self.infuse_manu = 0x0DE4
self.infuse_service = "0000fc74-0000-1000-8000-00805f9b34fb"
self.database = DeviceDatabase()
self.server = LocalServer(default_multicast_address())
self.bleak_mapping: dict[int, BLEDevice] = {}
Console.init()

async def server_handler(self):
sock = self.server._input_sock
sock.setblocking(False)
# Wrap the socket with an asyncio Datagram protocol
loop = asyncio.get_running_loop()
transport, _protocol = await loop.create_datagram_endpoint(
lambda: MulticastHandler(self.database, self.server, self.bleak_mapping),
sock=sock,
)
# Keep the server running
try:
await asyncio.Future() # Run forever
finally:
transport.close()

def simple_callback(self, device: BLEDevice, data: AdvertisementData):
addr = interface.Address(interface.Address.BluetoothLeAddr(0, BtLeAddress.integer_value(device.address)))
rssi = data.rssi
payload = data.manufacturer_data[self.infuse_manu]

hdr, decr = CtypeBtAdvFrame.decrypt(self.database, addr.val, payload)
self.bleak_mapping[hdr.device_id] = device

hop = HopReceived(
hdr.device_id,
Expand All @@ -68,16 +224,22 @@ def simple_callback(self, device: BLEDevice, data: AdvertisementData):
notification = ClientNotificationEpacketReceived(pkt)
self.server.broadcast(notification)

async def async_run(self):
self.server = LocalServer(default_multicast_address())
async def async_bt_receiver(self):
loop = asyncio.get_event_loop()
loop.create_task(self.server_handler())

scanner = BleakScanner(self.simple_callback, [self.infuse_service], cb=dict(use_bdaddr=True))
scanner = BleakScanner(self.simple_callback, [str(InfuseBluetoothUUID.SERVICE_UUID)], cb=dict(use_bdaddr=True))

while True:
Console.log_info("Starting scanner")
async with scanner:
# Run the scanner forever
await asyncio.Future()

def sync_request_handler(self):
# Loop while there are packets to send
while req := self.server.receive():
print(req)

def run(self):
asyncio.run(self.async_run())
asyncio.run(self.async_bt_receiver())
3 changes: 2 additions & 1 deletion src/infuse_iot/tools/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ def _run_standard_cmd(self):
header = rpc.RequestHeader(self._request_id, self._command.COMMAND_ID) # type: ignore
params = self._command.request_struct()

print(self._command)
request_packet = bytes(header) + bytes(params)
pkt = PacketOutput(
self._id,
Expand All @@ -166,5 +165,7 @@ def run(self):
self._run_data_cmd()
else:
self._run_standard_cmd()
except ConnectionRefusedError:
print(f"Unable to connect to {self._id:016x}")
finally:
self._client.connection_release()