diff --git a/src/infuse_iot/common.py b/src/infuse_iot/common.py index 5986620..5fb6167 100644 --- a/src/infuse_iot/common.py +++ b/src/infuse_iot/common.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import enum +import uuid class InfuseType(enum.IntEnum): @@ -26,3 +27,10 @@ class InfuseID(enum.IntEnum): """Hardcoded Infuse IDs""" GATEWAY = -1 + + +class InfuseBluetoothUUID: + SERVICE_UUID = uuid.UUID("0000fc74-0000-1000-8000-00805f9b34fb") + COMMAND_CHAR = uuid.UUID("dc0b71b7-fc74-fc74-aa01-8aba434a893d") + DATA_CHAR = uuid.UUID("dc0b71b7-fc74-fc74-aa02-8aba434a893d") + LOGGING_CHAR = uuid.UUID("dc0b71b7-fc74-fc74-aa03-8aba434a893d") diff --git a/src/infuse_iot/epacket/packet.py b/src/infuse_iot/epacket/packet.py index e6c8993..5a56a5a 100644 --- a/src/infuse_iot/epacket/packet.py +++ b/src/infuse_iot/epacket/packet.py @@ -468,7 +468,7 @@ def encrypt( return header_bytes + ciphertext @classmethod - def decrypt(cls, database: DeviceDatabase, bt_addr: Address.BluetoothLeAddr, frame: bytes): + def decrypt(cls, database: DeviceDatabase, bt_addr: Address.BluetoothLeAddr | None, frame: bytes): header = cls.from_buffer_copy(frame) if header.flags & Flags.ENCR_DEVICE: database.observe_device(header.device_id, device_id=header.key_metadata, bt_addr=bt_addr) diff --git a/src/infuse_iot/time.py b/src/infuse_iot/time.py index 076c310..3499ad6 100644 --- a/src/infuse_iot/time.py +++ b/src/infuse_iot/time.py @@ -4,11 +4,11 @@ import enum -class InfuseTimeSource(enum.IntFlag): +class InfuseTimeSource(enum.IntEnum): NONE = 0 - GNSS = 0x01 - NTP = 0x02 - RPC = 0x04 + GNSS = 1 + NTP = 2 + RPC = 3 RECOVERED = 0x80 def __str__(self) -> str: diff --git a/src/infuse_iot/tools/native_bt.py b/src/infuse_iot/tools/native_bt.py index efb17c9..3750a33 100644 --- a/src/infuse_iot/tools/native_bt.py +++ b/src/infuse_iot/tools/native_bt.py @@ -5,24 +5,39 @@ __author__ = "Jordan Yates" __copyright__ = "Copyright 2024, Embeint Inc" +import argparse import asyncio +import ctypes +import json +from typing import Any -from bleak import BleakScanner +from bleak import BleakClient, BleakScanner +from bleak.backends.characteristic import BleakGATTCharacteristic from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData -import infuse_iot.epacket.interface as interface from infuse_iot.commands import InfuseCommand +from infuse_iot.common import InfuseBluetoothUUID, InfuseType from infuse_iot.database import DeviceDatabase +from infuse_iot.epacket import interface from infuse_iot.epacket.packet import ( Auth, CtypeBtAdvFrame, + CtypeBtGattFrame, Flags, HopReceived, + PacketOutput, PacketReceived, ) from infuse_iot.socket_comms import ( + ClientNotificationConnectionCreated, + ClientNotificationConnectionFailed, ClientNotificationEpacketReceived, + GatewayRequest, + GatewayRequestConnection, + GatewayRequestConnectionRelease, + GatewayRequestConnectionRequest, + GatewayRequestEpacketSend, LocalServer, default_multicast_address, ) @@ -30,6 +45,130 @@ from infuse_iot.util.console import Console +class InfuseGattReadResponse(ctypes.LittleEndianStructure): + """Response to any read request on Infuse-IoT characteristics""" + + _fields_ = [ + ("cloud_public_key", 32 * ctypes.c_uint8), + ("device_public_key", 32 * ctypes.c_uint8), + ("network_id", ctypes.c_uint32), + ] + _pack_ = 1 + + +class MulticastHandler(asyncio.DatagramProtocol): + def __init__(self, database: DeviceDatabase, server: LocalServer, bleak_mapping: dict[int, BLEDevice]): + self._db = database + self._server = server + self._mapping = bleak_mapping + self._queues: dict[int, asyncio.Queue] = {} + + def notification_handler(self, _characteristic: BleakGATTCharacteristic, data: bytearray): + hdr, decr = CtypeBtGattFrame.decrypt(self._db, None, bytes(data)) + # Correct values are annoying to get here + if_addr = interface.Address(interface.Address.BluetoothLeAddr(0, 0)) + rssi = 0 + + bt_hop = HopReceived( + hdr.device_id, + interface.ID.BT_CENTRAL, + if_addr, + (Auth.DEVICE if hdr.flags & Flags.ENCR_DEVICE else Auth.NETWORK), + hdr.key_metadata, + hdr.gps_time, + hdr.sequence, + rssi, + ) + pkt = PacketReceived( + [bt_hop], + hdr.type, + bytes(decr), + ) + Console.log_rx(pkt.ptype, len(data)) + self._server.broadcast(ClientNotificationEpacketReceived(pkt)) + + async def create_connection(self, infuse_id: int, dev: BLEDevice, queue: asyncio.Queue): + Console.log_info(f"{dev}: Initiating connection") + async with BleakClient(dev) as client: + # Modified from bleak example code + if client._backend.__class__.__name__ == "BleakClientBlueZDBus": + await client._backend._acquire_mtu() # type: ignore + + security_info = await client.read_gatt_char(InfuseBluetoothUUID.COMMAND_CHAR) + resp = InfuseGattReadResponse.from_buffer_copy(security_info) + self._db.observe_security_state( + infuse_id, + bytes(resp.cloud_public_key), + bytes(resp.device_public_key), + resp.network_id, + ) + + await client.start_notify(InfuseBluetoothUUID.COMMAND_CHAR, self.notification_handler) + Console.log_info(f"{dev}: Connected (MTU {client.mtu_size})") + + self._server.broadcast( + ClientNotificationConnectionCreated( + infuse_id, + ) + ) + + req: GatewayRequest + while req := await queue.get(): + if isinstance(req, GatewayRequestConnectionRelease): + break + assert isinstance(req, GatewayRequestEpacketSend) + pkt: PacketOutput = req.epacket + + # Encrypt payload + encr = CtypeBtGattFrame.encrypt(self._db, infuse_id, pkt.ptype, pkt.auth, pkt.payload) + + if pkt.ptype in [InfuseType.RPC_CMD, InfuseType.RPC_DATA]: + uuid = InfuseBluetoothUUID.COMMAND_CHAR + else: + uuid = InfuseBluetoothUUID.DATA_CHAR + + Console.log_tx(pkt.ptype, len(encr)) + await client.write_gatt_char(uuid, encr, response=False) + + # Queue no longer being handled + self._queues.pop(infuse_id) + Console.log_info(f"{dev}: Terminating connection") + + def datagram_received(self, data: bytes, addr: tuple[str | Any, int]): + loop = asyncio.get_event_loop() + request = GatewayRequest.from_json(json.loads(data.decode("utf-8"))) + + # If not a connection request, attempt to forward to connection context + if not isinstance(request, GatewayRequestConnectionRequest): + if isinstance(request, GatewayRequestEpacketSend): + queue_id = request.epacket.infuse_id + elif isinstance(request, GatewayRequestConnection): + queue_id = request.infuse_id + else: + raise RuntimeError + q: asyncio.Queue | None = self._queues.get(queue_id, None) + if q is not None: + loop.call_soon(lambda: q.put_nowait(request)) # type: ignore + return + + ble_dev = self._mapping.get(request.infuse_id, None) + if ble_dev is None: + self._server.broadcast(ClientNotificationConnectionFailed(request.infuse_id)) + return + + # Create queue for further data transfer + q = asyncio.Queue() + self._queues[request.infuse_id] = q + # Create task to handle the connection + loop.create_task(self.create_connection(request.infuse_id, ble_dev, q)) + + def error_received(self, exc): + Console.log_error(f"Error received: {exc}") + + def connection_lost(self, exc): + Console.log_error("Connection closed") + + class SubCommand(InfuseCommand): NAME = "native_bt" HELP = "Native Bluetooth gateway" @@ -39,18 +178,35 @@ class SubCommand(InfuseCommand): def add_parser(cls, parser): pass - def __init__(self, args): + def __init__(self, args: argparse.Namespace): self.infuse_manu = 0x0DE4 - self.infuse_service = "0000fc74-0000-1000-8000-00805f9b34fb" self.database = DeviceDatabase() + self.server = LocalServer(default_multicast_address()) + self.bleak_mapping: dict[int, BLEDevice] = {} Console.init() + async def server_handler(self): + sock = self.server._input_sock + sock.setblocking(False) + # Wrap the socket with an asyncio Datagram protocol + loop = asyncio.get_running_loop() + transport, _protocol = await loop.create_datagram_endpoint( + lambda: MulticastHandler(self.database, self.server, self.bleak_mapping), + sock=sock, + ) + # Keep the server running + try: + await asyncio.Future() # Run forever + finally: + transport.close() + def simple_callback(self, device: BLEDevice, data: AdvertisementData): addr = interface.Address(interface.Address.BluetoothLeAddr(0, BtLeAddress.integer_value(device.address))) rssi = data.rssi payload = data.manufacturer_data[self.infuse_manu] hdr, decr = CtypeBtAdvFrame.decrypt(self.database, addr.val, payload) + self.bleak_mapping[hdr.device_id] = device hop = HopReceived( hdr.device_id, @@ -68,10 +224,11 @@ def simple_callback(self, device: BLEDevice, data: AdvertisementData): notification = ClientNotificationEpacketReceived(pkt) self.server.broadcast(notification) - async def async_run(self): - self.server = LocalServer(default_multicast_address()) + async def async_bt_receiver(self): + loop = asyncio.get_event_loop() + loop.create_task(self.server_handler()) - scanner = BleakScanner(self.simple_callback, [self.infuse_service], cb=dict(use_bdaddr=True)) + scanner = BleakScanner(self.simple_callback, [str(InfuseBluetoothUUID.SERVICE_UUID)], cb=dict(use_bdaddr=True)) while True: Console.log_info("Starting scanner") @@ -79,5 +236,10 @@ async def async_run(self): # Run the scanner forever await asyncio.Future() + def sync_request_handler(self): + # Loop while there are packets to send + while req := self.server.receive(): + print(req) + def run(self): - asyncio.run(self.async_run()) + asyncio.run(self.async_bt_receiver()) diff --git a/src/infuse_iot/tools/rpc.py b/src/infuse_iot/tools/rpc.py index 37e88f7..468bbf0 100644 --- a/src/infuse_iot/tools/rpc.py +++ b/src/infuse_iot/tools/rpc.py @@ -147,7 +147,6 @@ def _run_standard_cmd(self): header = rpc.RequestHeader(self._request_id, self._command.COMMAND_ID) # type: ignore params = self._command.request_struct() - print(self._command) request_packet = bytes(header) + bytes(params) pkt = PacketOutput( self._id, @@ -166,5 +165,7 @@ def run(self): self._run_data_cmd() else: self._run_standard_cmd() + except ConnectionRefusedError: + print(f"Unable to connect to {self._id:016x}") finally: self._client.connection_release()