Skip to content

Commit

Permalink
Fixes FutureWarning with device.rssi; uses advertisement data instead
Browse files Browse the repository at this point in the history
  • Loading branch information
bede committed Jun 13, 2023
1 parent 0f3c220 commit 24faa61
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-22.04, macos-latest]
os: [ubuntu-latest, macos-latest]
python-version: ["3.10"]
name: Python ${{ matrix.python-version }} (${{ matrix.os }})
steps:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ classifiers = [
"Operating System :: MacOS",
]
dependencies = [
"bleak >= 0.19.1",
"bleak >= 0.20.2",
"defopt >= 6.4.0",
# "pre-commit",
# "pytest",
Expand Down
13 changes: 9 additions & 4 deletions src/claranet4/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@ def dictify(obj):
return obj.__dict__


def scan():
def scan(*, timeout: int = 5):
"""
Show nearby Bluetooth devices
:arg timeout: listening timeout in seconds
"""
devices = lib.scan()
devices = lib.scan(timeout=timeout)
print(json.dumps(devices, default=dictify, indent=4))


def discover(*, substring: str = "Aranet4"):
def discover(*, substring: str = "Aranet4", timeout: int = 5):
"""
Discover nearby Aranet4 devices
:arg substring: device name substring used to identify Aranet4s
:arg timeout: listening timeout in seconds
"""
ara4_devices = lib.discover_ara4s()
print(json.dumps(ara4_devices, default=dictify, indent=4))
Expand All @@ -34,17 +37,19 @@ def read(
*,
field: Literal["co2", "temperature", "pressure", "humidity", ""] = "",
quiet: bool = False,
timeout: int = 5,
):
"""
Request latest measurements from a nearby Aranet4 device
:arg address: target device address
:arg field: return a specified field value only
:arg quiet: suppress notices on stderr
:arg timeout: listening timeout in seconds
"""
if quiet:
logging.getLogger().setLevel("WARNING")
reading = lib.read(address)
reading = lib.read(address, timeout=timeout)
if field:
print(getattr(reading, field))
else:
Expand Down
19 changes: 10 additions & 9 deletions src/claranet4/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ def le16(data: bytearray, start: int = 0) -> int:
return raw[start] + (raw[start + 1] << 8)


async def discover() -> list[Device]:
async def discover(timeout: int = 5):
"""Return list of Devices sorted by descending RSSI dBm"""
discoveries = await BleakScanner.discover(timeout=timeout, return_adv=True)
devices = [
Device(address=d.address, name=str(d.name), rssi=d.rssi)
for d in await BleakScanner.discover()
Device(address=d.address, name=str(d.name), rssi=a.rssi)
for d, a in discoveries.values()
]
logging.info(f"Found {len(devices)} device(s)")
return sorted(devices, key=lambda d: d.rssi, reverse=True)
Expand All @@ -63,14 +64,14 @@ async def request_measurements(address: str) -> bytearray:
return await client.read_gatt_char(UUID_CURRENT_MEASUREMENTS_SIMPLE)


def scan() -> list[Device]:
def scan(timeout: int = 5) -> list[Device]:
"""Show Bluetooth devices in the vicinity"""
return asyncio.run(discover())
return asyncio.run(discover(timeout=timeout))


def discover_ara4s(substring: str = "Aranet4") -> list[Device]:
def discover_ara4s(substring: str = "Aranet4", timeout: int = 5) -> list[Device]:
"""Find Aranet4s in the vicinity"""
devices = scan()
devices = scan(timeout=timeout)
ara4_devices = [d for d in devices if substring in d.name]
logging.info(f"Found {len(ara4_devices)} Aranet4 device(s)")
return ara4_devices
Expand All @@ -85,11 +86,11 @@ def find_device(address) -> Device:
raise DeviceError(f"Could not find device {address}")


def read(address: str = "") -> Reading:
def read(address: str = "", timeout: int = 5) -> Reading:
if address:
device = find_device(address)
else:
ara4_devices = discover_ara4s()
ara4_devices = discover_ara4s(timeout=timeout)
if not ara4_devices:
raise DeviceError("No Aranet4 devices discovered")
else:
Expand Down

0 comments on commit 24faa61

Please sign in to comment.