diff --git a/benchmarks/timeseries/resampling.py b/benchmarks/timeseries/resampling.py new file mode 100644 index 000000000..ee7ad9cd6 --- /dev/null +++ b/benchmarks/timeseries/resampling.py @@ -0,0 +1,55 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Benchmark resampling.""" + +from datetime import datetime, timedelta, timezone +from timeit import timeit +from typing import Sequence + +from frequenz.sdk.timeseries import Sample +from frequenz.sdk.timeseries._resampling._resampler import ( + ResamplerConfig, + _ResamplingHelper, +) + + +def nop( # pylint: disable=unused-argument + samples: Sequence[Sample], resampling_period_s: float +) -> float: + """Return 0.0.""" + return 0.0 + + +def _benchmark_resampling_helper(resamples: int, samples: int) -> None: + """Benchmark the resampling helper.""" + helper = _ResamplingHelper( + ResamplerConfig( + resampling_period_s=1.0, + max_data_age_in_periods=3.0, + resampling_function=nop, + initial_buffer_len=samples * 3, + ) + ) + now = datetime.now(timezone.utc) + + def _do_work() -> None: + nonlocal now + for _n_resample in range(resamples): + for _n_sample in range(samples): + now = now + timedelta(seconds=1 / samples) + helper.add_sample(Sample(now, 0.0)) + helper.resample(now) + + print(timeit(_do_work, number=5)) + + +def _benchmark() -> None: + for resamples in [10, 100, 1000]: + for samples in [10, 100, 1000]: + print(f"{resamples=} {samples=}") + _benchmark_resampling_helper(resamples, samples) + + +if __name__ == "__main__": + _benchmark() diff --git a/examples/resampling.py b/examples/resampling.py index 684490a91..f045c2049 100644 --- a/examples/resampling.py +++ b/examples/resampling.py @@ -18,7 +18,8 @@ ) from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._resampling import Resampler, Sink, Source +from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig +from frequenz.sdk.timeseries._resampling._resampler import Sink, Source HOST = "microgrid.sandbox.api.frequenz.io" PORT = 61060 @@ -65,7 +66,7 @@ async def run() -> None: # pylint: disable=too-many-locals channel_registry=channel_registry, data_sourcing_request_sender=data_source_request_sender, resampling_request_receiver=resampling_request_receiver, - resampling_period_s=1, + config=ResamplerConfig(resampling_period_s=1), ) components = await microgrid.get().api_client.components() @@ -104,7 +105,7 @@ async def run() -> None: # pylint: disable=too-many-locals # Create a channel to calculate an average for all the data average_chan = Broadcast[Sample]("average") - second_stage_resampler = Resampler(resampling_period_s=3.0) + second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0)) second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample) average_sender = average_chan.new_sender() diff --git a/pyproject.toml b/pyproject.toml index b0de607c3..389fae441 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "sympy >= 1.10.1, < 2", "toml >= 0.10", "tqdm >= 4.38.0, < 5", + "typing_extensions >= 4.4.0, < 5", "watchfiles >= 0.15.0", ] dynamic = [ "version" ] diff --git a/src/frequenz/sdk/actor/__init__.py b/src/frequenz/sdk/actor/__init__.py index 875684daf..abc94a2aa 100644 --- a/src/frequenz/sdk/actor/__init__.py +++ b/src/frequenz/sdk/actor/__init__.py @@ -7,7 +7,7 @@ from ._config_managing import ConfigManagingActor from ._data_sourcing import ComponentMetricRequest, DataSourcingActor from ._decorator import actor -from ._resampling import ComponentMetricsResamplingActor +from ._resampling import ComponentMetricsResamplingActor, ResamplerConfig __all__ = [ "ChannelRegistry", @@ -15,5 +15,6 @@ "ComponentMetricsResamplingActor", "ConfigManagingActor", "DataSourcingActor", + "ResamplerConfig", "actor", ] diff --git a/src/frequenz/sdk/actor/_resampling.py b/src/frequenz/sdk/actor/_resampling.py index c79e980d0..db9aa3e5b 100644 --- a/src/frequenz/sdk/actor/_resampling.py +++ b/src/frequenz/sdk/actor/_resampling.py @@ -14,12 +14,7 @@ from frequenz.sdk.util.asyncio import cancel_and_await from ..timeseries import Sample -from ..timeseries._resampling import ( - Resampler, - ResamplingError, - ResamplingFunction, - average, -) +from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError from ._channel_registry import ChannelRegistry from ._data_sourcing import ComponentMetricRequest from ._decorator import actor @@ -37,9 +32,7 @@ def __init__( # pylint: disable=too-many-arguments channel_registry: ChannelRegistry, data_sourcing_request_sender: Sender[ComponentMetricRequest], resampling_request_receiver: Receiver[ComponentMetricRequest], - resampling_period_s: float = 0.2, - max_data_age_in_periods: float = 3.0, - resampling_function: ResamplingFunction = average, + config: ResamplerConfig, ) -> None: """Initialize an instance. @@ -51,34 +44,16 @@ def __init__( # pylint: disable=too-many-arguments to subscribe to component metrics. resampling_request_receiver: The receiver to use to receive new resampmling subscription requests. - resampling_period_s: The time it passes between resampled data - should be calculated (in seconds). - max_data_age_in_periods: The maximum age a sample can have to be - considered *relevant* for resampling purposes, expressed in the - number of resampling periods. For exapmle is - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, - then data older than `3*2 = 6` secods will be discarded when - creating a new sample and never passed to the resampling - function. - resampling_function: The function to be applied to the sequence of - *relevant* samples at a given time. The result of the function - is what is sent as the resampled data. + config: The configuration for the resampler. """ self._channel_registry: ChannelRegistry = channel_registry - self._resampling_period_s: float = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods - self._resampling_function: ResamplingFunction = resampling_function self._data_sourcing_request_sender: Sender[ ComponentMetricRequest ] = data_sourcing_request_sender self._resampling_request_receiver: Receiver[ ComponentMetricRequest ] = resampling_request_receiver - self._resampler: Resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=max_data_age_in_periods, - resampling_function=resampling_function, - ) + self._resampler: Resampler = Resampler(config) self._active_req_channels: set[str] = set() async def _subscribe(self, request: ComponentMetricRequest) -> None: diff --git a/src/frequenz/sdk/timeseries/_base_types.py b/src/frequenz/sdk/timeseries/_base_types.py index 36a6e2bd5..6d54a35b3 100644 --- a/src/frequenz/sdk/timeseries/_base_types.py +++ b/src/frequenz/sdk/timeseries/_base_types.py @@ -3,12 +3,16 @@ """Timeseries basic types.""" -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from typing import Optional -@dataclass(frozen=True) +# Ordering by timestamp is a bit arbitrary, and it is not always what might be +# wanted. We are using this order now because usually we need to do binary +# searches on sequences of samples, and the Python `bisect` module doesn't +# support providing a key until Python 3.10. +@dataclass(frozen=True, order=True) class Sample: """A measurement taken at a particular point in time. @@ -17,5 +21,8 @@ class Sample: coherent view on a group of component metrics for a particular timestamp. """ - timestamp: datetime - value: Optional[float] = None + timestamp: datetime = field(compare=True) + """The time when this sample was generated.""" + + value: Optional[float] = field(compare=False, default=None) + """The value of this sample.""" diff --git a/src/frequenz/sdk/timeseries/_resampling/__init__.py b/src/frequenz/sdk/timeseries/_resampling/__init__.py new file mode 100644 index 000000000..11ef0349b --- /dev/null +++ b/src/frequenz/sdk/timeseries/_resampling/__init__.py @@ -0,0 +1,21 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Timeseries resampling.""" + + +from ._resampler import ( + Resampler, + ResamplerConfig, + ResamplingError, + ResamplingFunction, + SourceStoppedError, +) + +__all__ = [ + "Resampler", + "ResamplerConfig", + "ResamplingError", + "ResamplingFunction", + "SourceStoppedError", +] diff --git a/src/frequenz/sdk/timeseries/_resampling/_buffer.py b/src/frequenz/sdk/timeseries/_resampling/_buffer.py new file mode 100644 index 000000000..8021cf331 --- /dev/null +++ b/src/frequenz/sdk/timeseries/_resampling/_buffer.py @@ -0,0 +1,262 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Timeseries resampler buffer handling.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Sequence, TypeVar, overload + +T = TypeVar("T") + + +@dataclass +class _Uninitialized: + """A sentinel class to mark uninitialized data in the buffer.""" + + +_UNINITIALIZED = _Uninitialized() +"""A uninitialized sentinel instance.""" + + +class _BufferView(Sequence[T]): + """A read-only view into a Buffer. + + This is a ring buffer with the following structure: + + * It is a fixed size buffer. + * If the buffer isn't full, elements not yet filled up are represented with + a `_UNINITIALIZED` instance. + * The `tail` pointer points to the first element in the view. + * The `head` pointer points past the last element in the view. + * The ring buffer can, of course, wrap around, so `head` can be smaller than `tail`. + * When `head` == `tail`, the view is empty, unless `_is_full` is `True`, in + which case the view covers the whole buffer. + """ + + def __init__( + self, *, buffer: list[_Uninitialized | T], full: bool, tail: int, head: int + ) -> None: + """Create an instance. + + Args: + buffer: The internal buffer that needs to be viewed. + full: Whether this is a view on a full buffer. + tail: The index of the first element of the view. + head: The index of the last element of the view. + """ + super().__init__() + self._buffer: list[_Uninitialized | T] = buffer + self._is_full: bool = full + self._tail: int = tail # Points to the oldest element in the buffer + self._head: int = head # Points to the newest element in the buffer + + @property + def capacity(self) -> int: + """Return the capacity of the internal buffer. + + Returns: + The capacity of the internal buffer. + """ + return len(self._buffer) + + def _adjust(self, position: int, *, offset: int = 0) -> int: + """Adjust a position to point to a valid index in the buffer. + + An optional `offset` can be passed if the position needs to be also + updated using some offset. + + Args: + position: The position index to adjust. + offset: The offset to use to calculate the new position. + + Returns: + The adjusted position index. + """ + return (position + offset) % self.capacity + + def _index(self, /, index: int) -> T: + """Get the nth element in this view. + + Args: + index: The index of the element to get. + + Returns: + The element in the nth position. + """ + if index < 0: + index += len(self) + if index < 0: + raise IndexError("Buffer index out of range") + if index >= len(self): + raise IndexError("Buffer index out of range") + index = self._adjust(self._tail, offset=index) + item = self._buffer[index] + assert not isinstance(item, _Uninitialized) + return item + + def _slice(self, /, slice_: slice) -> Sequence[T]: + """Get a slice on this view. + + Args: + slice_: The parameters of the slice to get. + + Returns: + The slice on this view. + """ + start, stop, step = slice_.indices(len(self)) + # This is a bit expensive, but making all the calculation each time + # an item is accessed would also be expensive. This way the slice + # might be expensive to construct but then is super cheap to use, + # which should probably be the most common sense. We also avoid + # having to implement a lot of weird corner cases. + if step != 1: + new_slice: list[T] = [] + for i in range(start, stop, step): + new_slice.append(self[i]) + return new_slice + + # If we have a step of 1, then things are very simple, and we can + # use another view on the same buffer. This is also probably the + # most common slicing case. + + # If the requested size is empty, then just return an empty tuple + if start >= stop: + return () + + tail = self._adjust(self._tail, offset=start) + head = self._adjust(self._tail, offset=stop) + slice_is_full = stop - start == self.capacity + return _BufferView( + buffer=self._buffer, full=slice_is_full, head=head, tail=tail + ) + + @overload + def __getitem__(self, index: int) -> T: + ... + + @overload + def __getitem__(self, index: slice) -> Sequence[T]: + ... + + def __getitem__(self, index: int | slice) -> T | Sequence[T]: + """Get an item or slice on this view. + + Args: + index: The index of the element or parameters of the slice to get. + + Returns: + The element or slice on this view. + """ + if isinstance(index, slice): + return self._slice(index) + + return self._index(index) + + def __len__(self) -> int: + """Get the length of this view. + + Returns: + The length of this view. + """ + if self._is_full: + return self.capacity + if self._head == self._tail: + return 0 + offset = self.capacity if self._head < self._tail else 0 + return offset + self._head - self._tail + + def __repr__(self) -> str: + """Get the string representation of this view. + + Returns: + The string representation of this view. + """ + items = [] + if self._tail != self._head or self._is_full: + tail = self._tail + for _ in range(len(self)): + items.append(self._buffer[tail]) + tail = self._adjust(tail, offset=1) + return f"{self.__class__.__name__}({items})" + + def _debug_repr(self) -> str: + """Get the debug string representation of this view. + + Returns: + The debug string representation of this view. + """ + return ( + f"{self.__class__.__name__}(buffer={self._buffer!r}, " + "full={self._is_full!r}, tail={self._tail!r}, head={self._head!r})" + ) + + def __eq__(self, __o: object) -> bool: + """Compare this view to another object. + + Returns: + `True` if the other object is also a `Sequence` and all the + elements compare equal, `False` otherwise. + """ + if isinstance(__o, Sequence): + len_self = len(self) + if len_self != len(__o): + return False + for i in range(len_self): + if self[i] != __o[i]: + return False + return True + return super().__eq__(__o) + + +class Buffer(_BufferView[T]): + """A push-only, fixed-size ring buffer. + + Elements can be `push`ed to the buffer, when the buffer gets full, newer + elements will replace older elements. The buffer can be `clear`ed. + """ + + def __init__(self, /, capacity: int, initial_values: Sequence[T] = ()) -> None: + """Create an instance. + + Args: + capacity: The capacity of the buffer. + initial_values: The initial values to fill the buffer with. If it + has more items than the capacity, then only the last items will + be added. + """ + if capacity <= 0: + raise ValueError("The buffer capacity must be larger than 0") + if len(initial_values) > capacity: + initial_values = initial_values[-capacity:] + buffer: list[_Uninitialized | T] = list(initial_values) + is_full = False + if len(initial_values) < capacity: + buffer += [_UNINITIALIZED] * (capacity - len(initial_values)) + head = len(initial_values) + else: + is_full = True + head = 0 + super().__init__(buffer=buffer, full=is_full, tail=0, head=head) + + def clear(self) -> None: + """Clear the buffer.""" + self._head = 0 + self._tail = 0 + self._is_full = False + + def push(self, element: T) -> None: + """Push a new element to the buffer. + + If the buffer is full, the oldest element will be dropped. + + Args: + element: Element to push. + """ + self._buffer[self._head] = element + self._head = self._adjust(self._head, offset=1) + if self._is_full: + self._tail = self._adjust(self._tail, offset=1) + if self._head == self._tail and not self._is_full: + self._is_full = True diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py similarity index 73% rename from src/frequenz/sdk/timeseries/_resampling.py rename to src/frequenz/sdk/timeseries/_resampling/_resampler.py index d550e3d0f..1f4240be3 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -6,18 +6,25 @@ from __future__ import annotations import asyncio -import logging import math -from collections import deque +from bisect import bisect +from dataclasses import dataclass from datetime import datetime, timedelta from typing import AsyncIterator, Callable, Coroutine, Sequence from frequenz.channels.util import Timer -from ..util.asyncio import cancel_and_await -from . import Sample +from ...util.asyncio import cancel_and_await +from .. import Sample +from ._buffer import Buffer -_logger = logging.Logger(__name__) +DEFAULT_BUFFER_LEN_INIT = 16 +"""Default initial buffer length. + +Buffers will be created initially with this length, but they could grow or +shrink depending on the source characteristics, like sampling rate, to make +sure all the requested past sampling periods can be stored. +""" Source = AsyncIterator[Sample] @@ -85,6 +92,42 @@ def average(samples: Sequence[Sample], resampling_period_s: float) -> float: return sum(values) / len(values) +@dataclass(frozen=True) +class ResamplerConfig: + """Resampler configuration.""" + + resampling_period_s: float + """The resapmling period in seconds. + + This is the time it passes between resampled data should be calculated. + """ + + max_data_age_in_periods: float = 3.0 + """The maximum age a sample can have to be considered *relevant* for resampling. + + Expressed in number of resampling periods. For example if + `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, then data + older than 3*2 = 6 secods will be discarded when creating a new sample and + never passed to the resampling function. + """ + + resampling_function: ResamplingFunction = average + """The resampling function. + + This function will be applied to the sequence of relevant samples at + a given time. The result of the function is what is sent as the resampled + value. + """ + + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT + """The initial length of the resampling buffer. + + The buffer could grow or shrink depending on the source characteristics, + like sampling rate, to make sure all the requested past sampling periods + can be stored. + """ + + class SourceStoppedError(RuntimeError): """A timeseries stopped producing samples.""" @@ -166,34 +209,24 @@ class Resampler: no way to produce meaningful samples with the available data. """ - def __init__( - self, - *, - resampling_period_s: float, - resampling_function: ResamplingFunction = average, - max_data_age_in_periods: float = 3.0, - ) -> None: + def __init__(self, config: ResamplerConfig) -> None: """Initialize an instance. Args: - resampling_period_s: The time it passes between resampled data - should be calculated (in seconds). - max_data_age_in_periods: The maximum age a sample can have to be - considered *relevant* for resampling purposes, expressed in the - number of resampling periods. For exapmle is - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, - then data older than `3*2 = 6` secods will be discarded when - creating a new sample and never passed to the resampling - function. - resampling_function: The function to be applied to the sequence of - *relevant* samples at a given time. The result of the function - is what is sent as the resampled data. + config: The configuration for the resampler. """ - self._resampling_period_s = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods - self._resampling_function: ResamplingFunction = resampling_function + self._config = config self._resamplers: dict[Source, _StreamingHelper] = {} - self._timer: Timer = Timer(self._resampling_period_s) + self._timer: Timer = Timer(config.resampling_period_s) + + @property + def config(self) -> ResamplerConfig: + """Get the resampler configuration. + + Returns: + The resampler configuration. + """ + return self._config async def stop(self) -> None: """Cancel all receiving tasks.""" @@ -214,15 +247,7 @@ def add_timeseries(self, source: Source, sink: Sink) -> bool: if source in self._resamplers: return False - resampler = _StreamingHelper( - _ResamplingHelper( - resampling_period_s=self._resampling_period_s, - max_data_age_in_periods=self._max_data_age_in_periods, - resampling_function=self._resampling_function, - ), - source, - sink, - ) + resampler = _StreamingHelper(_ResamplingHelper(self._config), source, sink) self._resamplers[source] = resampler return True @@ -288,33 +313,14 @@ class _ResamplingHelper: when calling the `resample()` method. All older samples are discarded. """ - def __init__( - self, - *, - resampling_period_s: float, - max_data_age_in_periods: float, - resampling_function: ResamplingFunction, - ) -> None: + def __init__(self, config: ResamplerConfig) -> None: """Initialize an instance. Args: - resampling_period_s: The time it passes between resampled data - should be calculated (in seconds). - max_data_age_in_periods: The maximum age a sample can have to be - considered *relevant* for resampling purposes, expressed in the - number of resampling periods. For exapmle is - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, - then data older than 3*2 = 6 secods will be discarded when - creating a new sample and never passed to the resampling - function. - resampling_function: The function to be applied to the sequence of - relevant samples at a given time. The result of the function is - what is sent as the resampled data. + config: The configuration for the resampler. """ - self._resampling_period_s = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods - self._buffer: deque[Sample] = deque() - self._resampling_function: ResamplingFunction = resampling_function + self._config = config + self._buffer: Buffer = Buffer(config.initial_buffer_len) def add_sample(self, sample: Sample) -> None: """Add a new sample to the internal buffer. @@ -322,31 +328,7 @@ def add_sample(self, sample: Sample) -> None: Args: sample: The sample to be added to the buffer. """ - self._buffer.append(sample) - - def _remove_outdated_samples(self, threshold: datetime) -> None: - """Remove samples that are older than the provided time threshold. - - It is assumed that items in the buffer are in a sorted order (ascending order - by timestamp). - - The removal works by traversing the buffer starting from the oldest sample - (smallest timestamp) and comparing sample's timestamp with the threshold. - If the sample's threshold is smaller than `threshold`, it means that the - sample is outdated and it is removed from the buffer. This continues until - the first sample that is with timestamp greater or equal to `threshold` is - encountered, then buffer is considered up to date. - - Args: - threshold: samples whose timestamp is older than the threshold are - considered outdated and should be remove from the buffer - """ - while self._buffer: - sample: Sample = self._buffer[0] - if sample.timestamp > threshold: - return - - self._buffer.popleft() + self._buffer.push(sample) def resample(self, timestamp: datetime) -> Sample: """Generate a new sample based on all the current *relevant* samples. @@ -360,15 +342,20 @@ def resample(self, timestamp: datetime) -> Sample: If there are no *relevant* samples, then the new sample will have `None` as `value`. """ - threshold = timestamp - timedelta( - seconds=self._max_data_age_in_periods * self._resampling_period_s + conf = self._config + minimum_relevant_timestamp = timestamp - timedelta( + seconds=conf.resampling_period_s * conf.max_data_age_in_periods ) - self._remove_outdated_samples(threshold=threshold) - + # We need to pass a dummy Sample to bisect because it only support + # specifying a key extraction function in Python 3.10, so we need to + # compare samples at the moment. + cut_index = bisect(self._buffer, Sample(minimum_relevant_timestamp, None)) + # pylint: disable=fixme + relevant_samples = self._buffer[cut_index:] value = ( - None - if not self._buffer - else self._resampling_function(self._buffer, self._resampling_period_s) + conf.resampling_function(relevant_samples, conf.resampling_period_s) + if relevant_samples + else None ) return Sample(timestamp, value) @@ -392,7 +379,7 @@ def __init__( self._helper: _ResamplingHelper = helper self._source: Source = source self._sink: Sink = sink - self._receiving_task: asyncio.Task = asyncio.create_task( + self._receiving_task: asyncio.Task[None] = asyncio.create_task( self._receive_samples() ) diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index 102a2d71d..af5439995 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -16,6 +16,7 @@ ChannelRegistry, ComponentMetricRequest, ComponentMetricsResamplingActor, + ResamplerConfig, ) from frequenz.sdk.microgrid.component import ComponentMetricId from frequenz.sdk.timeseries import Sample @@ -123,8 +124,10 @@ async def test_single_request( channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), - resampling_period_s=0.2, - max_data_age_in_periods=2, + config=ResamplerConfig( + resampling_period_s=0.2, + max_data_age_in_periods=2, + ), ) subs_req = ComponentMetricRequest( @@ -167,8 +170,10 @@ async def test_duplicate_request( channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), - resampling_period_s=0.2, - max_data_age_in_periods=2, + config=ResamplerConfig( + resampling_period_s=0.2, + max_data_age_in_periods=2, + ), ) subs_req = ComponentMetricRequest( diff --git a/tests/timeseries/mock_microgrid.py b/tests/timeseries/mock_microgrid.py index eebaaabbc..106ea0b56 100644 --- a/tests/timeseries/mock_microgrid.py +++ b/tests/timeseries/mock_microgrid.py @@ -27,6 +27,7 @@ ComponentMetricRequest, ComponentMetricsResamplingActor, DataSourcingActor, + ResamplerConfig, ) from tests.microgrid import mock_api @@ -236,7 +237,7 @@ async def _init_client_and_actors( channel_registry=channel_registry, data_sourcing_request_sender=data_source_request_sender, resampling_request_receiver=resampling_actor_request_receiver, - resampling_period_s=0.1, + config=ResamplerConfig(resampling_period_s=0.1), ) return (resampling_actor_request_sender, channel_registry) diff --git a/tests/timeseries/resampling/__init__.py b/tests/timeseries/resampling/__init__.py new file mode 100644 index 000000000..3e762810b --- /dev/null +++ b/tests/timeseries/resampling/__init__.py @@ -0,0 +1,4 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Frequenz Python SDK Tests.""" diff --git a/tests/timeseries/resampling/test_buffer.py b/tests/timeseries/resampling/test_buffer.py new file mode 100644 index 000000000..07f6e89bd --- /dev/null +++ b/tests/timeseries/resampling/test_buffer.py @@ -0,0 +1,177 @@ +# tests/timeseries/test_resampling.py:93: License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Tests for the resampling buffer.""" + +from __future__ import annotations + +from typing import Optional + +import pytest + +from frequenz.sdk.timeseries._resampling._buffer import Buffer + + +def test_buffer_init_zero_capacity() -> None: + """Test a buffer with zero capacity can't be created.""" + with pytest.raises(ValueError): + _ = Buffer[int](0) + + +def test_buffer_init_empty() -> None: + """Test an empty buffer is empty and can be pushed to.""" + buffer = Buffer[int](2) + + assert not buffer + assert buffer == [] # pylint: disable=use-implicit-booleaness-not-comparison + assert len(buffer) == 0 + for i in range(-5, 5): + with pytest.raises(IndexError): + _ = buffer[i] + + buffer.push(11) + assert buffer == [11] + + buffer.push(15) + assert buffer == [11, 15] + + buffer.push(25) + assert buffer == [15, 25] + + +def test_buffer_init_full() -> None: + """Test a full buffer can be created, pushed to, and drops old values.""" + init = (3, 2, 6, 10, 9) + buffer = Buffer[int](5, init) + + assert buffer == init + assert len(buffer) == len(init) + for i in range(5): + assert buffer[i] == init[i] + for i in range(-1, -6, -1): + assert buffer[i] == init[i] + with pytest.raises(IndexError): + _ = buffer[-6] + with pytest.raises(IndexError): + _ = buffer[5] + + buffer.push(11) + assert buffer == [2, 6, 10, 9, 11] + assert buffer[4] == 11 + with pytest.raises(IndexError): + _ = buffer[-6] + with pytest.raises(IndexError): + _ = buffer[5] + + +def test_buffer_init_partial() -> None: + """Test a partially initialized buffer. + + Test it can be created, pushed to, grows and eventuall drops old values. + """ + init = (2, 9) + buffer = Buffer[int](4, init) + + assert buffer == init + assert len(buffer) == len(init) + with pytest.raises(IndexError): + _ = buffer[-3] + with pytest.raises(IndexError): + _ = buffer[2] + for i in range(2): + assert buffer[i] == init[i] + for i in range(-1, -3, -1): + assert buffer[i] == init[i] + + buffer.push(11) + assert buffer == [2, 9, 11] + + buffer.push(15) + assert buffer == [2, 9, 11, 15] + + buffer.push(25) + assert buffer == [9, 11, 15, 25] + + +def test_buffer_push() -> None: + """Test a buffer can be pushed to.""" + buffer = Buffer[int](2) + + with pytest.raises(IndexError): + _ = buffer[0] + with pytest.raises(IndexError): + _ = buffer[1] + with pytest.raises(IndexError): + _ = buffer[2] + + buffer.push(10) + assert len(buffer) == 1 + assert buffer == [10] + assert buffer[0] == 10 + with pytest.raises(IndexError): + _ = buffer[2] + + buffer.push(20) + assert len(buffer) == 2 + assert buffer == [10, 20] + assert buffer[0] == 10 + assert buffer[1] == 20 + with pytest.raises(IndexError): + _ = buffer[2] + + buffer.push(5) + assert len(buffer) == 2 + assert buffer == [20, 5] + assert buffer[0] == 20 + assert buffer[1] == 5 + with pytest.raises(IndexError): + _ = buffer[2] + + +def test_buffer_clear() -> None: + """Test a buffer can be cleared correctly.""" + buffer = Buffer[int](2, (1, 2)) + + buffer.clear() + assert not buffer + assert buffer == [] # pylint: disable=use-implicit-booleaness-not-comparison + assert len(buffer) == 0 + with pytest.raises(IndexError): + _ = buffer[0] + with pytest.raises(IndexError): + _ = buffer[1] + with pytest.raises(IndexError): + _ = buffer[2] + + +@pytest.mark.parametrize("step", [None, 1, 2, 5, -5, -2, -1]) +@pytest.mark.parametrize("stop", [0, None, 1, 3, 5, -5, -3, -1]) +@pytest.mark.parametrize("start", [0, None, 1, 3, 5, -5, -3, -1]) +def test_slice(start: Optional[int], stop: Optional[int], step: Optional[int]) -> None: + """Test slicing operations.""" + init = (3, 2, 6, 10, 9) + buffer = Buffer[int](5, init) + + init_slice = init[start:stop:step] + slice_ = buffer[start:stop:step] + + assert len(slice_) == len(init_slice) + assert list(slice_) == list(init_slice) + for i in range(3): + with pytest.raises(IndexError): + _ = init_slice[-len(init_slice) - i - 1] + with pytest.raises(IndexError): + # For some reason pylint thinks slice_ is unsubscriptable, but the + # type is properly inferred as Sequence[int], so it looks like + # a pylint bug + # pylint: disable=unsubscriptable-object + _ = slice_[-len(init_slice) - i - 1] + with pytest.raises(IndexError): + # pylint: disable=unsubscriptable-object + _ = slice_[len(init_slice) + i] + for i in range(len(init_slice)): # pylint: disable=consider-using-enumerate + # pylint: disable=unsubscriptable-object + assert slice_[i] == init_slice[i] + for i in range(-1, -len(init_slice) - 1, -1): + # pylint: disable=unsubscriptable-object + assert slice_[i] == init_slice[i] diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index b81d8f79d..55014b49d 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -17,12 +17,12 @@ from frequenz.sdk.timeseries import Sample from frequenz.sdk.timeseries._resampling import ( Resampler, + ResamplerConfig, ResamplingError, ResamplingFunction, - Sink, - Source, SourceStoppedError, ) +from frequenz.sdk.timeseries._resampling._resampler import Sink, Source from ..utils import a_sequence @@ -90,9 +90,11 @@ async def test_resampling_with_one_window( spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=1.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -180,9 +182,11 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=1.5, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=1.5, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -311,9 +315,11 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -442,9 +448,11 @@ async def test_receiving_stopped_resampling_error( spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -499,9 +507,11 @@ async def test_receiving_resampling_error(fake_time: time_machine.Coordinates) - spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + ) ) class TestException(Exception):