Skip to content

Commit

Permalink
added python docstrings to most classes and methods
Browse files Browse the repository at this point in the history
  • Loading branch information
jonashoechst committed Feb 10, 2022
1 parent 5de9e6d commit c81e16c
Show file tree
Hide file tree
Showing 8 changed files with 510 additions and 74 deletions.
10 changes: 10 additions & 0 deletions radiotracking/.vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"python.formatting.autopep8Args": [
"--max-line-length",
"200"
],
"python.linting.pylintEnabled": true,
"python.linting.pylintArgs": [
"--max-line-length=200"
]
}
183 changes: 160 additions & 23 deletions radiotracking/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,83 @@
logger = logging.getLogger(__name__)


def dB(val):
def dB(val: float):
"""
Convert a power value to dB.
"""
return 10 * np.log10(val)


def from_dB(dB):
def from_dB(dB: float):
"""Convert a dB value to power."""
return 10 ** (dB / 10)


class AbstractSignal(ABC):
ts: datetime.datetime
frequency: float
duration: datetime.timedelta
"""
Abstract class for a signal.
"""
header: List[str]
"""Header for the as_list() method."""

def __init__(self) -> None:
super().__init__()

self.ts: datetime.datetime
"""Timestamp of the signal."""
self.frequency: float
"""Frequency of the signal in Hz."""
self.duration: datetime.timedelta
"""Duration of the signal."""

@property
@abstractmethod
def as_list(self):
pass
def as_list(self) -> List:
"""
Return the signal as a list of values.
Returns
-------
typing.List[typing.Any]
"""

@property
def as_dict(self):
def as_dict(self) -> Dict:
"""
Return the signal as a dictionary.
Returns
-------
typing.Dict[str, typing.Any]
"""
return dict(zip(self.header, self.as_list))


class Signal(AbstractSignal):
max_dBW: float
avg_dBW: float
std_dB: float
snr_dB: float
device: str
"""
Signal detected on a single device.
Parameters
----------
device: str
The device that detected the signal.
ts: typing.Union[datetime.datetime, str]
The timestamp of the signal.
frequency: typing.Union[float, str]
The frequency of the signal.
duration: typing.Union[datetime.timedelta, float, str]
The duration of the signal.
max_dBW: typing.Union[float, str]
The maximum power of the signal.
avg_dBW: typing.Union[float, str]
The average power of the signal.
std_dB: typing.Union[float, str]
The standard deviation in power of the signal.
noise_dBW: typing.Union[float, str]
The noise level of the signal.
snr_dB: typing.Union[float, str]
The signal to noise ratio of the signal.
"""

def __init__(
self,
Expand All @@ -52,6 +99,8 @@ def __init__(
noise_dBW: Union[float, str],
snr_dB: Union[float, str],
):
super().__init__()

self.device = device
if isinstance(ts, datetime.datetime):
self.ts = ts
Expand All @@ -63,11 +112,16 @@ def __init__(
else:
self.duration = datetime.timedelta(seconds=float(duration))

self.max = float(max_dBW)
self.avg = float(avg_dBW)
self.std = float(std_dB)
self.noise = float(noise_dBW)
self.snr = float(snr_dB)
self.max: float = float(max_dBW)
"""The maximum power of the signal."""
self.avg: float = float(avg_dBW)
"""The average power of the signal."""
self.std: float = float(std_dB)
"""The standard deviation in power of the signal."""
self.noise: float = float(noise_dBW)
"""The noise level of the signal."""
self.snr: float = float(snr_dB)
"""The signal to noise ratio of the signal."""

header = [
"Device",
Expand Down Expand Up @@ -103,6 +157,23 @@ def __str__(self):


class MatchedSignal(AbstractSignal):
"""
Matched Signal detected on multiple devices.
Parameters
----------
devices: typing.List[str]
The devices that could have detected the signal.
ts: typing.Union[datetime.datetime, str]
The timestamp of the signal.
frequency: typing.Union[float, str]
The frequency of the signal.
duration: typing.Union[datetime.timedelta, float, str]
The duration of the signal.
avgs: typing.List[float]
The average powers detected on the available devices.
"""

def __init__(
self,
devices: List[str],
Expand All @@ -111,6 +182,7 @@ def __init__(
duration: Union[datetime.timedelta, float, str],
*avgs: float,
):
super().__init__()
self.devices = devices

if isinstance(ts, datetime.datetime):
Expand All @@ -126,7 +198,8 @@ def __init__(
self._avgs: List[float] = avgs

@property
def header(self):
def header(self) -> List[str]:
"""Header for the as_list() method."""
return [
"Time",
"Frequency",
Expand All @@ -135,7 +208,7 @@ def header(self):
]

@property
def as_list(self) -> list:
def as_list(self) -> List:
return [
self.ts,
self.frequency,
Expand All @@ -153,24 +226,61 @@ def __str__(self):


class MatchingSignal(MatchedSignal):
"""
Class for matching signals detected on multiple devices.
Parameters
----------
devices: typing.List[str]
The devices that are available to detect signals.
"""

def __init__(self, devices: List[str]):
self.devices = devices
self._sigs: Dict[str, Signal] = {}

@property
def duration(self):
def duration(self) -> datetime.timedelta:
"""
Duration of the matching signal based on the detected maximum.
Returns
-------
datetime.timedelta
"""
return max([sig.duration for sig in self._sigs.values()])

@property
def ts(self):
def ts(self) -> datetime.datetime:
"""
Timestamp of the matching signal based on the earliest detection.
Returns
-------
datetime.datetime
"""
return min([sig.ts for sig in self._sigs.values()])

@property
def frequency(self):
def frequency(self) -> float:
"""
Frequency of the matching signal based on median frequency.
Returns
-------
float
"""
return statistics.median([sig.frequency for sig in self._sigs.values()])

@property
def _avgs(self) -> List[float]:
"""
Average powers of the matching signal.
Returns
-------
typing.List[float]
"""
return [self._sigs[d].avg if d in self._sigs else None for d in self.devices]

def has_member(self,
Expand All @@ -179,6 +289,25 @@ def has_member(self,
bandwidth: float = 0,
duration_diff: Optional[datetime.timedelta] = None,
) -> bool:
"""
Checks if a Signal is part of this matching signal.
Parameters
----------
sig: radiotracking.Signal
The signal to check.
time_diff: datetime.timedelta
Allowed difference of the timestamp.
bandwidth: float
Allowed difference of the frequency.
duration_diff: datetime.timedelta
Allowed difference of the duration.
Returns
-------
bool
True if the signal is part of this matching signal.
"""

# if freq (including bw) out of range of freq
if sig.frequency - bandwidth / 2 > self.frequency:
Expand Down Expand Up @@ -207,6 +336,14 @@ def has_member(self,
return True

def add_member(self, sig: Signal):
"""
Adds a signal to the matching signal.
Parameters
----------
sig: radiotracking.Signal
The signal to add.
"""
if sig.device in self._sigs:
logger.warning(f"{sig} already contained in {self}")
if self._sigs[sig.device].avg < sig.avg:
Expand Down
39 changes: 37 additions & 2 deletions radiotracking/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@


class Runner:
"""
A class to represent a running instance of pyradiotracking.
"""
parser = ArgConfParser(
prog="radiotracking",
description="Detect signals of wildlife tracking systems with RTL SDR devices",
Expand Down Expand Up @@ -86,6 +89,23 @@ class Runner:
dashboard_options.add_argument("--dashboard-signals", help="number of signals to present", default=100, type=int)

def create_and_start(self, device: str, calibration_db: float, sdr_max_restart: int = None) -> SignalAnalyzer:
"""
Creates, starts and returns a signal analyzer thread.
Parameters
----------
device: str
device index or name
calibration_db: float
calibration gain (dB)
sdr_max_restart: int
max restart count per SDR device
Returns
-------
SignalAnalyzer: radiotracking.SignalAnalyzer
signal analyzer thread
"""
dargs = argparse.Namespace(**vars(self.args))
dargs.device = device
dargs.calibration_db = calibration_db
Expand All @@ -107,19 +127,28 @@ def create_and_start(self, device: str, calibration_db: float, sdr_max_restart:
return analyzer

def start_analyzers(self):
"""
Start all requested analyzer threads.
"""
if self.analyzers:
logger.critical("")
logger.info("Starting all analyzers")
for device, calibration_db in zip(self.args.device, self.args.calibration):
self.analyzers.append(self.create_and_start(device, calibration_db))

def stop_analyzers(self):
"""
Stop all analyzer threads.
"""
logger.info("Stopping all analyzers")
[a.kill() for a in self.analyzers]
[a.join() for a in self.analyzers]
self.analyzers = []

def check_analyzers(self):
"""
Check if all analyzer threads are still running.
"""
now = datetime.datetime.now()

# iterate the analyzer copy to allow for altering (restarting) analyzers
Expand Down Expand Up @@ -156,6 +185,9 @@ def check_analyzers(self):
self.analyzers.append(new_analyzer)

def terminate(self, sig):
"""
Terminate the application.
"""
logger.warning(f"Caught {signal.Signals(sig).name}, terminating {len(self.analyzers)} analyzers.")
self.running = False

Expand Down Expand Up @@ -249,11 +281,14 @@ def __init__(self):
self.schedule.append((start_s.at_time, stop_s.at_time))
logger.debug(f"Added {start_s.at_time}-{stop_s.at_time} to schedule")

except schedule.ScheduleError as e:
logger.error(f"{e}, please check configuration '{entry}'.")
except schedule.ScheduleError as error:
logger.error(f"{error}, please check configuration '{entry}'.")
exit(1)

def main(self):
"""
Run the main loop of the application.
"""
logger.warning("Running radiotracking...")

if self.dashboard:
Expand Down
Loading

0 comments on commit c81e16c

Please sign in to comment.