diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ccfd627..d9476aa 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,9 +15,9 @@ jobs: max-parallel: 2 fail-fast: true matrix: - # Python version first means we'll test all the versions on Ubuntu first - python-version: ['3.10', '3.11', '3.12', '3.13'] + # Test the extreme Python versions for now to reduce CI runtime os: [ubuntu-latest, windows-latest] + python-version: ['3.10', '3.13'] steps: - uses: actions/checkout@v4 # This is enough to find many quoting issues diff --git a/src/infuse_iot/socket_comms.py b/src/infuse_iot/socket_comms.py index 7428621..232a45c 100644 --- a/src/infuse_iot/socket_comms.py +++ b/src/infuse_iot/socket_comms.py @@ -81,6 +81,17 @@ class ClientNotificationConnectionCreated(ClientNotificationConnection): TYPE = ClientNotificationConnection.Type.CONNECTION_CREATED + def __init__(self, infuse_id: int, max_payload: int): + super().__init__(infuse_id) + self.max_payload = max_payload + + def to_json(self) -> dict: + return {"type": int(self.TYPE), "infuse_id": self.infuse_id, "max_payload": self.max_payload} + + @classmethod + def from_json(cls, values: dict) -> Self: + return cls(values["infuse_id"], values["max_payload"]) + class ClientNotificationConnectionDropped(ClientNotificationConnection): """Connection to device has been lost""" @@ -145,6 +156,22 @@ class GatewayRequestConnectionRequest(GatewayRequestConnection): TYPE = GatewayRequestConnection.Type.CONNECTION_REQUEST + class DataType(enum.IntFlag): + COMMAND = 1 + DATA = 2 + LOGGING = 4 + + def __init__(self, infuse_id: int, data_types: DataType): + super().__init__(infuse_id) + self.data_types = data_types + + def to_json(self) -> dict: + return {"type": int(self.TYPE), "infuse_id": self.infuse_id, "data_types": self.data_types} + + @classmethod + def from_json(cls, values: dict) -> Self: + return cls(values["infuse_id"], values["data_types"]) + class GatewayRequestConnectionRelease(GatewayRequestConnection): """Release connection context to device""" @@ -207,18 +234,19 @@ def receive(self) -> ClientNotification | None: return None return ClientNotification.from_json(json.loads(data.decode("utf-8"))) - def connection_create(self, infuse_id: int): + def connection_create(self, infuse_id: int, data_types: GatewayRequestConnectionRequest.DataType) -> int: self._connection_id = infuse_id # Send the request for the connection - req = GatewayRequestConnectionRequest(infuse_id) + req = GatewayRequestConnectionRequest(infuse_id, data_types) self.send(req) # Wait for response from the server while rsp := self.receive(): if isinstance(rsp, ClientNotificationConnectionCreated): - break + return rsp.max_payload elif isinstance(rsp, ClientNotificationConnectionFailed): raise ConnectionRefusedError + raise ConnectionRefusedError def connection_release(self): assert self._connection_id is not None diff --git a/src/infuse_iot/tools/bt_log.py b/src/infuse_iot/tools/bt_log.py new file mode 100644 index 0000000..b5d27cf --- /dev/null +++ b/src/infuse_iot/tools/bt_log.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 + +"""Connect to remote Bluetooth device serial logs""" + +__author__ = "Jordan Yates" +__copyright__ = "Copyright 2024, Embeint Inc" + + +from infuse_iot.commands import InfuseCommand +from infuse_iot.common import InfuseType +from infuse_iot.socket_comms import ( + ClientNotificationConnectionDropped, + ClientNotificationEpacketReceived, + GatewayRequestConnectionRequest, + LocalClient, + default_multicast_address, +) + + +class SubCommand(InfuseCommand): + NAME = "bt_log" + HELP = "Connect to remote Bluetooth device serial logs" + DESCRIPTION = "Connect to remote Bluetooth device serial logs" + + def __init__(self, args): + self._client = LocalClient(default_multicast_address(), 60.0) + self._id = args.id + + @classmethod + def add_parser(cls, parser): + parser.add_argument("--id", type=lambda x: int(x, 0), help="Infuse ID to receive logs for") + + def run(self): + try: + self._client.connection_create(self._id, GatewayRequestConnectionRequest.DataType.LOGGING) + + while rsp := self._client.receive(): + if isinstance(rsp, ClientNotificationConnectionDropped): + print(f"Connection to {self._id:016x} lost") + break + if isinstance(rsp, ClientNotificationEpacketReceived) and rsp.epacket.ptype == InfuseType.SERIAL_LOG: + print(rsp.epacket.payload.decode("utf-8"), end="") + + except KeyboardInterrupt: + print(f"Disconnecting from {self._id:016x}") + except ConnectionRefusedError: + print(f"Unable to connect to {self._id:016x}") + finally: + self._client.connection_release() diff --git a/src/infuse_iot/tools/gateway.py b/src/infuse_iot/tools/gateway.py index 06e9851..5ca15b4 100644 --- a/src/infuse_iot/tools/gateway.py +++ b/src/infuse_iot/tools/gateway.py @@ -29,6 +29,7 @@ ) from infuse_iot.epacket.packet import ( Auth, + CtypeBtGattFrame, HopOutput, PacketOutputRouted, PacketReceived, @@ -289,7 +290,7 @@ def _bt_connect_cb(self, pkt: PacketReceived, rc: int, response: bytes): if rc < 0: rsp = ClientNotificationConnectionFailed(infuse_id) else: - rsp = ClientNotificationConnectionCreated(infuse_id) + rsp = ClientNotificationConnectionCreated(infuse_id, 244 - ctypes.sizeof(CtypeBtGattFrame) - 16) self._common.server.broadcast(rsp) def _handle_conn_request(self, req: GatewayRequestConnectionRequest): @@ -297,7 +298,7 @@ def _handle_conn_request(self, req: GatewayRequestConnectionRequest): if req.infuse_id == InfuseID.GATEWAY: # Local gateway always connected - self._common.server.broadcast(ClientNotificationConnectionCreated(req.infuse_id)) + self._common.server.broadcast(ClientNotificationConnectionCreated(req.infuse_id, 512)) return state = self._common.ddb.devices.get(req.infuse_id, None) @@ -305,10 +306,18 @@ def _handle_conn_request(self, req: GatewayRequestConnectionRequest): self._common.server.broadcast(ClientNotificationConnectionFailed(req.infuse_id)) return + subs = 0 + if req.data_types & req.DataType.COMMAND: + subs |= defs.rpc_enum_infuse_bt_characteristic.COMMAND + if req.data_types & req.DataType.DATA: + subs |= defs.rpc_enum_infuse_bt_characteristic.DATA + if req.data_types & req.DataType.LOGGING: + subs |= defs.rpc_enum_infuse_bt_characteristic.LOGGING + connect_args = defs.bt_connect_infuse.request( state.bt_addr.to_rpc_struct(), 10000, - defs.rpc_enum_infuse_bt_characteristic.COMMAND, + subs, 0, ) cmd = self._common.rpc.generate( diff --git a/src/infuse_iot/tools/native_bt.py b/src/infuse_iot/tools/native_bt.py index 3750a33..1ae0d40 100644 --- a/src/infuse_iot/tools/native_bt.py +++ b/src/infuse_iot/tools/native_bt.py @@ -87,28 +87,36 @@ def notification_handler(self, _characteristic: BleakGATTCharacteristic, data: b Console.log_rx(pkt.ptype, len(data)) self._server.broadcast(ClientNotificationEpacketReceived(pkt)) - async def create_connection(self, infuse_id: int, dev: BLEDevice, queue: asyncio.Queue): + async def create_connection(self, request: GatewayRequestConnectionRequest, dev: BLEDevice, queue: asyncio.Queue): Console.log_info(f"{dev}: Initiating connection") async with BleakClient(dev) as client: # Modified from bleak example code if client._backend.__class__.__name__ == "BleakClientBlueZDBus": - await client._backend._acquire_mtu() # type: ignore + await client._backend._acquire_mtu() # type: ignore security_info = await client.read_gatt_char(InfuseBluetoothUUID.COMMAND_CHAR) resp = InfuseGattReadResponse.from_buffer_copy(security_info) self._db.observe_security_state( - infuse_id, + request.infuse_id, bytes(resp.cloud_public_key), bytes(resp.device_public_key), resp.network_id, ) - await client.start_notify(InfuseBluetoothUUID.COMMAND_CHAR, self.notification_handler) + if request.data_types & request.DataType.COMMAND: + await client.start_notify(InfuseBluetoothUUID.COMMAND_CHAR, self.notification_handler) + if request.data_types & request.DataType.DATA: + await client.start_notify(InfuseBluetoothUUID.DATA_CHAR, self.notification_handler) + if request.data_types & request.DataType.LOGGING: + await client.start_notify(InfuseBluetoothUUID.LOGGING_CHAR, self.notification_handler) + Console.log_info(f"{dev}: Connected (MTU {client.mtu_size})") self._server.broadcast( ClientNotificationConnectionCreated( - infuse_id, + request.infuse_id, + # ATT header uses 3 bytes of the MTU + client.mtu_size - 3 - ctypes.sizeof(CtypeBtGattFrame) - 16, ) ) @@ -120,7 +128,7 @@ async def create_connection(self, infuse_id: int, dev: BLEDevice, queue: asyncio pkt: PacketOutput = req.epacket # Encrypt payload - encr = CtypeBtGattFrame.encrypt(self._db, infuse_id, pkt.ptype, pkt.auth, pkt.payload) + encr = CtypeBtGattFrame.encrypt(self._db, request.infuse_id, pkt.ptype, pkt.auth, pkt.payload) if pkt.ptype in [InfuseType.RPC_CMD, InfuseType.RPC_DATA]: uuid = InfuseBluetoothUUID.COMMAND_CHAR @@ -131,7 +139,7 @@ async def create_connection(self, infuse_id: int, dev: BLEDevice, queue: asyncio await client.write_gatt_char(uuid, encr, response=False) # Queue no longer being handled - self._queues.pop(infuse_id) + self._queues.pop(request.infuse_id) Console.log_info(f"{dev}: Terminating connection") def datagram_received(self, data: bytes, addr: tuple[str | Any, int]): @@ -160,7 +168,7 @@ def datagram_received(self, data: bytes, addr: tuple[str | Any, int]): q = asyncio.Queue() self._queues[request.infuse_id] = q # Create task to handle the connection - loop.create_task(self.create_connection(request.infuse_id, ble_dev, q)) + loop.create_task(self.create_connection(request, ble_dev, q)) def error_received(self, exc): Console.log_error(f"Error received: {exc}") diff --git a/src/infuse_iot/tools/rpc.py b/src/infuse_iot/tools/rpc.py index 7d22a86..c1a034a 100644 --- a/src/infuse_iot/tools/rpc.py +++ b/src/infuse_iot/tools/rpc.py @@ -19,6 +19,7 @@ from infuse_iot.socket_comms import ( ClientNotificationConnectionDropped, ClientNotificationEpacketReceived, + GatewayRequestConnectionRequest, GatewayRequestEpacketSend, LocalClient, default_multicast_address, @@ -57,6 +58,7 @@ def __init__(self, args: argparse.Namespace): self._client = LocalClient(default_multicast_address(), 10.0) self._command: InfuseRpcCommand = args.rpc_class(args) self._request_id = random.randint(0, 2**32 - 1) + self._max_payload = 0 if args.gateway: self._id = InfuseID.GATEWAY else: @@ -114,10 +116,12 @@ def _run_data_send_cmd(self): # Wait for initial ACK self._wait_data_ack() - # Send data payloads (384 byte chunks for now) + # Send data payloads with maximum interface size ack_cnt = -ack_period offset = 0 - size = 192 + size = self._max_payload - ctypes.sizeof(rpc.DataHeader) + # Round payload down to multiple of 4 bytes + size -= size % 4 while len(data) > 0: size = min(size, len(data)) payload = data[:size] @@ -202,7 +206,9 @@ def _run_standard_cmd(self): def run(self): try: - self._client.connection_create(self._id) + self._max_payload = self._client.connection_create( + self._id, GatewayRequestConnectionRequest.DataType.COMMAND + ) if self._command.RPC_DATA_SEND: self._run_data_send_cmd() elif self._command.RPC_DATA_RECEIVE: