-
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: port advertisement_tracker (#11)
- Loading branch information
Showing
5 changed files
with
97 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
import cython | ||
|
||
cdef class AdvertisementTracker: | ||
|
||
cdef public dict intervals | ||
cdef public dict fallback_intervals | ||
cdef public dict sources | ||
cdef public dict _timings | ||
|
||
@cython.locals(timings=list) | ||
cpdef async_collect(self, object service_info) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
"""The advertisement tracker.""" | ||
from __future__ import annotations | ||
|
||
from typing import Any | ||
|
||
from home_assistant_bluetooth import BluetoothServiceInfoBleak | ||
|
||
ADVERTISING_TIMES_NEEDED = 16 | ||
|
||
# Each scanner may buffer incoming packets so | ||
# we need to give a bit of leeway before we | ||
# mark a device unavailable | ||
TRACKER_BUFFERING_WOBBLE_SECONDS = 5 | ||
|
||
|
||
class AdvertisementTracker: | ||
"""Tracker to determine the interval that a device is advertising.""" | ||
|
||
__slots__ = ("intervals", "fallback_intervals", "sources", "_timings") | ||
|
||
def __init__(self) -> None: | ||
"""Initialize the tracker.""" | ||
self.intervals: dict[str, float] = {} | ||
self.fallback_intervals: dict[str, float] = {} | ||
self.sources: dict[str, str] = {} | ||
self._timings: dict[str, list[float]] = {} | ||
|
||
def async_diagnostics(self) -> dict[str, dict[str, Any]]: | ||
"""Return diagnostics.""" | ||
return { | ||
"intervals": self.intervals, | ||
"fallback_intervals": self.fallback_intervals, | ||
"sources": self.sources, | ||
"timings": self._timings, | ||
} | ||
|
||
def async_collect(self, service_info: BluetoothServiceInfoBleak) -> None: | ||
""" | ||
Collect timings for the tracker. | ||
For performance reasons, it is the responsibility of the | ||
caller to check if the device already has an interval set or | ||
the source has changed before calling this function. | ||
""" | ||
address = service_info.address | ||
self.sources[address] = service_info.source | ||
timings = self._timings.setdefault(address, []) | ||
timings.append(service_info.time) | ||
if len(timings) != ADVERTISING_TIMES_NEEDED: | ||
return | ||
|
||
max_time_between_advertisements = timings[1] - timings[0] | ||
for i in range(2, len(timings)): | ||
time_between_advertisements = timings[i] - timings[i - 1] | ||
if time_between_advertisements > max_time_between_advertisements: | ||
max_time_between_advertisements = time_between_advertisements | ||
|
||
# We now know the maximum time between advertisements | ||
self.intervals[address] = max_time_between_advertisements | ||
del self._timings[address] | ||
|
||
def async_remove_address(self, address: str) -> None: | ||
"""Remove the tracker.""" | ||
self.intervals.pop(address, None) | ||
self.sources.pop(address, None) | ||
self._timings.pop(address, None) | ||
|
||
def async_remove_fallback_interval(self, address: str) -> None: | ||
"""Remove fallback interval.""" | ||
self.fallback_intervals.pop(address, None) | ||
|
||
def async_remove_source(self, source: str) -> None: | ||
"""Remove the tracker.""" | ||
for address, tracked_source in list(self.sources.items()): | ||
if tracked_source == source: | ||
self.async_remove_address(address) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
|
||
import cython | ||
|
||
cdef object NO_RSSI_VALUE | ||
cdef object BluetoothServiceInfoBleak | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,6 @@ | ||
import cython | ||
|
||
|
||
from .base_scanner cimport BaseHaScanner | ||
|
||
|
||
|