Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ jobs:
max-parallel: 2
fail-fast: true
matrix:
# Python version first means we'll test all the versions on Ubuntu first
python-version: ['3.10', '3.11', '3.12', '3.13']
# Test the extreme Python versions for now to reduce CI runtime
os: [ubuntu-latest, windows-latest]
python-version: ['3.10', '3.13']
steps:
- uses: actions/checkout@v4
# This is enough to find many quoting issues
Expand Down
34 changes: 31 additions & 3 deletions src/infuse_iot/socket_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ class ClientNotificationConnectionCreated(ClientNotificationConnection):

TYPE = ClientNotificationConnection.Type.CONNECTION_CREATED

def __init__(self, infuse_id: int, max_payload: int):
super().__init__(infuse_id)
self.max_payload = max_payload

def to_json(self) -> dict:
return {"type": int(self.TYPE), "infuse_id": self.infuse_id, "max_payload": self.max_payload}

@classmethod
def from_json(cls, values: dict) -> Self:
return cls(values["infuse_id"], values["max_payload"])


class ClientNotificationConnectionDropped(ClientNotificationConnection):
"""Connection to device has been lost"""
Expand Down Expand Up @@ -145,6 +156,22 @@ class GatewayRequestConnectionRequest(GatewayRequestConnection):

TYPE = GatewayRequestConnection.Type.CONNECTION_REQUEST

class DataType(enum.IntFlag):
COMMAND = 1
DATA = 2
LOGGING = 4

def __init__(self, infuse_id: int, data_types: DataType):
super().__init__(infuse_id)
self.data_types = data_types

def to_json(self) -> dict:
return {"type": int(self.TYPE), "infuse_id": self.infuse_id, "data_types": self.data_types}

@classmethod
def from_json(cls, values: dict) -> Self:
return cls(values["infuse_id"], values["data_types"])


class GatewayRequestConnectionRelease(GatewayRequestConnection):
"""Release connection context to device"""
Expand Down Expand Up @@ -207,18 +234,19 @@ def receive(self) -> ClientNotification | None:
return None
return ClientNotification.from_json(json.loads(data.decode("utf-8")))

def connection_create(self, infuse_id: int):
def connection_create(self, infuse_id: int, data_types: GatewayRequestConnectionRequest.DataType) -> int:
self._connection_id = infuse_id

# Send the request for the connection
req = GatewayRequestConnectionRequest(infuse_id)
req = GatewayRequestConnectionRequest(infuse_id, data_types)
self.send(req)
# Wait for response from the server
while rsp := self.receive():
if isinstance(rsp, ClientNotificationConnectionCreated):
break
return rsp.max_payload
elif isinstance(rsp, ClientNotificationConnectionFailed):
raise ConnectionRefusedError
raise ConnectionRefusedError

def connection_release(self):
assert self._connection_id is not None
Expand Down
49 changes: 49 additions & 0 deletions src/infuse_iot/tools/bt_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env python3

"""Connect to remote Bluetooth device serial logs"""

__author__ = "Jordan Yates"
__copyright__ = "Copyright 2024, Embeint Inc"


from infuse_iot.commands import InfuseCommand
from infuse_iot.common import InfuseType
from infuse_iot.socket_comms import (
ClientNotificationConnectionDropped,
ClientNotificationEpacketReceived,
GatewayRequestConnectionRequest,
LocalClient,
default_multicast_address,
)


class SubCommand(InfuseCommand):
NAME = "bt_log"
HELP = "Connect to remote Bluetooth device serial logs"
DESCRIPTION = "Connect to remote Bluetooth device serial logs"

def __init__(self, args):
self._client = LocalClient(default_multicast_address(), 60.0)
self._id = args.id

@classmethod
def add_parser(cls, parser):
parser.add_argument("--id", type=lambda x: int(x, 0), help="Infuse ID to receive logs for")

def run(self):
try:
self._client.connection_create(self._id, GatewayRequestConnectionRequest.DataType.LOGGING)

while rsp := self._client.receive():
if isinstance(rsp, ClientNotificationConnectionDropped):
print(f"Connection to {self._id:016x} lost")
break
if isinstance(rsp, ClientNotificationEpacketReceived) and rsp.epacket.ptype == InfuseType.SERIAL_LOG:
print(rsp.epacket.payload.decode("utf-8"), end="")

except KeyboardInterrupt:
print(f"Disconnecting from {self._id:016x}")
except ConnectionRefusedError:
print(f"Unable to connect to {self._id:016x}")
finally:
self._client.connection_release()
15 changes: 12 additions & 3 deletions src/infuse_iot/tools/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from infuse_iot.epacket.packet import (
Auth,
CtypeBtGattFrame,
HopOutput,
PacketOutputRouted,
PacketReceived,
Expand Down Expand Up @@ -289,26 +290,34 @@ def _bt_connect_cb(self, pkt: PacketReceived, rc: int, response: bytes):
if rc < 0:
rsp = ClientNotificationConnectionFailed(infuse_id)
else:
rsp = ClientNotificationConnectionCreated(infuse_id)
rsp = ClientNotificationConnectionCreated(infuse_id, 244 - ctypes.sizeof(CtypeBtGattFrame) - 16)
self._common.server.broadcast(rsp)

def _handle_conn_request(self, req: GatewayRequestConnectionRequest):
assert self._common.server is not None

if req.infuse_id == InfuseID.GATEWAY:
# Local gateway always connected
self._common.server.broadcast(ClientNotificationConnectionCreated(req.infuse_id))
self._common.server.broadcast(ClientNotificationConnectionCreated(req.infuse_id, 512))
return

state = self._common.ddb.devices.get(req.infuse_id, None)
if state is None or state.bt_addr is None:
self._common.server.broadcast(ClientNotificationConnectionFailed(req.infuse_id))
return

subs = 0
if req.data_types & req.DataType.COMMAND:
subs |= defs.rpc_enum_infuse_bt_characteristic.COMMAND
if req.data_types & req.DataType.DATA:
subs |= defs.rpc_enum_infuse_bt_characteristic.DATA
if req.data_types & req.DataType.LOGGING:
subs |= defs.rpc_enum_infuse_bt_characteristic.LOGGING

connect_args = defs.bt_connect_infuse.request(
state.bt_addr.to_rpc_struct(),
10000,
defs.rpc_enum_infuse_bt_characteristic.COMMAND,
subs,
0,
)
cmd = self._common.rpc.generate(
Expand Down
24 changes: 16 additions & 8 deletions src/infuse_iot/tools/native_bt.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,28 +87,36 @@ def notification_handler(self, _characteristic: BleakGATTCharacteristic, data: b
Console.log_rx(pkt.ptype, len(data))
self._server.broadcast(ClientNotificationEpacketReceived(pkt))

async def create_connection(self, infuse_id: int, dev: BLEDevice, queue: asyncio.Queue):
async def create_connection(self, request: GatewayRequestConnectionRequest, dev: BLEDevice, queue: asyncio.Queue):
Console.log_info(f"{dev}: Initiating connection")
async with BleakClient(dev) as client:
# Modified from bleak example code
if client._backend.__class__.__name__ == "BleakClientBlueZDBus":
await client._backend._acquire_mtu() # type: ignore
await client._backend._acquire_mtu() # type: ignore

security_info = await client.read_gatt_char(InfuseBluetoothUUID.COMMAND_CHAR)
resp = InfuseGattReadResponse.from_buffer_copy(security_info)
self._db.observe_security_state(
infuse_id,
request.infuse_id,
bytes(resp.cloud_public_key),
bytes(resp.device_public_key),
resp.network_id,
)

await client.start_notify(InfuseBluetoothUUID.COMMAND_CHAR, self.notification_handler)
if request.data_types & request.DataType.COMMAND:
await client.start_notify(InfuseBluetoothUUID.COMMAND_CHAR, self.notification_handler)
if request.data_types & request.DataType.DATA:
await client.start_notify(InfuseBluetoothUUID.DATA_CHAR, self.notification_handler)
if request.data_types & request.DataType.LOGGING:
await client.start_notify(InfuseBluetoothUUID.LOGGING_CHAR, self.notification_handler)

Console.log_info(f"{dev}: Connected (MTU {client.mtu_size})")

self._server.broadcast(
ClientNotificationConnectionCreated(
infuse_id,
request.infuse_id,
# ATT header uses 3 bytes of the MTU
client.mtu_size - 3 - ctypes.sizeof(CtypeBtGattFrame) - 16,
)
)

Expand All @@ -120,7 +128,7 @@ async def create_connection(self, infuse_id: int, dev: BLEDevice, queue: asyncio
pkt: PacketOutput = req.epacket

# Encrypt payload
encr = CtypeBtGattFrame.encrypt(self._db, infuse_id, pkt.ptype, pkt.auth, pkt.payload)
encr = CtypeBtGattFrame.encrypt(self._db, request.infuse_id, pkt.ptype, pkt.auth, pkt.payload)

if pkt.ptype in [InfuseType.RPC_CMD, InfuseType.RPC_DATA]:
uuid = InfuseBluetoothUUID.COMMAND_CHAR
Expand All @@ -131,7 +139,7 @@ async def create_connection(self, infuse_id: int, dev: BLEDevice, queue: asyncio
await client.write_gatt_char(uuid, encr, response=False)

# Queue no longer being handled
self._queues.pop(infuse_id)
self._queues.pop(request.infuse_id)
Console.log_info(f"{dev}: Terminating connection")

def datagram_received(self, data: bytes, addr: tuple[str | Any, int]):
Expand Down Expand Up @@ -160,7 +168,7 @@ def datagram_received(self, data: bytes, addr: tuple[str | Any, int]):
q = asyncio.Queue()
self._queues[request.infuse_id] = q
# Create task to handle the connection
loop.create_task(self.create_connection(request.infuse_id, ble_dev, q))
loop.create_task(self.create_connection(request, ble_dev, q))

def error_received(self, exc):
Console.log_error(f"Error received: {exc}")
Expand Down
12 changes: 9 additions & 3 deletions src/infuse_iot/tools/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from infuse_iot.socket_comms import (
ClientNotificationConnectionDropped,
ClientNotificationEpacketReceived,
GatewayRequestConnectionRequest,
GatewayRequestEpacketSend,
LocalClient,
default_multicast_address,
Expand Down Expand Up @@ -57,6 +58,7 @@ def __init__(self, args: argparse.Namespace):
self._client = LocalClient(default_multicast_address(), 10.0)
self._command: InfuseRpcCommand = args.rpc_class(args)
self._request_id = random.randint(0, 2**32 - 1)
self._max_payload = 0
if args.gateway:
self._id = InfuseID.GATEWAY
else:
Expand Down Expand Up @@ -114,10 +116,12 @@ def _run_data_send_cmd(self):
# Wait for initial ACK
self._wait_data_ack()

# Send data payloads (384 byte chunks for now)
# Send data payloads with maximum interface size
ack_cnt = -ack_period
offset = 0
size = 192
size = self._max_payload - ctypes.sizeof(rpc.DataHeader)
# Round payload down to multiple of 4 bytes
size -= size % 4
while len(data) > 0:
size = min(size, len(data))
payload = data[:size]
Expand Down Expand Up @@ -202,7 +206,9 @@ def _run_standard_cmd(self):

def run(self):
try:
self._client.connection_create(self._id)
self._max_payload = self._client.connection_create(
self._id, GatewayRequestConnectionRequest.DataType.COMMAND
)
if self._command.RPC_DATA_SEND:
self._run_data_send_cmd()
elif self._command.RPC_DATA_RECEIVE:
Expand Down
Loading