From dc7a4028cac27ad5b8684f498b36f6fc59279bc8 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 21 Dec 2022 12:46:47 -0300 Subject: [PATCH 1/5] Add typing_extensions to the dependencies Signed-off-by: Leandro Lucarella --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index b0de607c3..389fae441 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "sympy >= 1.10.1, < 2", "toml >= 0.10", "tqdm >= 4.38.0, < 5", + "typing_extensions >= 4.4.0, < 5", "watchfiles >= 0.15.0", ] dynamic = [ "version" ] From 9b2599597d0a7a5741b9fc045b88749105589e4a Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 7 Dec 2022 12:27:49 +0100 Subject: [PATCH 2/5] Move resampler configuration to its own class This makes user code a little bit more verbose but makes the code much more maintainable, as we avoid having to copy and passing around a lot of configuration variables that are only (or mostly) only relevant to the internal resampling class. It also removes a lot of documentation duplication that can get easily out of sync and cause confusion. And we'll add quite a few more configuration variables in subsequent commits, which will just exacerbate the above mentioned issues without a config class. Signed-off-by: Leandro Lucarella --- examples/resampling.py | 6 +- src/frequenz/sdk/actor/__init__.py | 3 +- src/frequenz/sdk/actor/_resampling.py | 33 +------ src/frequenz/sdk/timeseries/_resampling.py | 107 ++++++++++----------- tests/actor/test_resampling.py | 13 ++- tests/timeseries/mock_microgrid.py | 3 +- tests/timeseries/test_resampling.py | 41 +++++--- 7 files changed, 97 insertions(+), 109 deletions(-) diff --git a/examples/resampling.py b/examples/resampling.py index 684490a91..6412a240d 100644 --- a/examples/resampling.py +++ b/examples/resampling.py @@ -18,7 +18,7 @@ ) from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._resampling import Resampler, Sink, Source +from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig, Sink, Source HOST = "microgrid.sandbox.api.frequenz.io" PORT = 61060 @@ -65,7 +65,7 @@ async def run() -> None: # pylint: disable=too-many-locals channel_registry=channel_registry, data_sourcing_request_sender=data_source_request_sender, resampling_request_receiver=resampling_request_receiver, - resampling_period_s=1, + config=ResamplerConfig(resampling_period_s=1), ) components = await microgrid.get().api_client.components() @@ -104,7 +104,7 @@ async def run() -> None: # pylint: disable=too-many-locals # Create a channel to calculate an average for all the data average_chan = Broadcast[Sample]("average") - second_stage_resampler = Resampler(resampling_period_s=3.0) + second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0)) second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample) average_sender = average_chan.new_sender() diff --git a/src/frequenz/sdk/actor/__init__.py b/src/frequenz/sdk/actor/__init__.py index 875684daf..abc94a2aa 100644 --- a/src/frequenz/sdk/actor/__init__.py +++ b/src/frequenz/sdk/actor/__init__.py @@ -7,7 +7,7 @@ from ._config_managing import ConfigManagingActor from ._data_sourcing import ComponentMetricRequest, DataSourcingActor from ._decorator import actor -from ._resampling import ComponentMetricsResamplingActor +from ._resampling import ComponentMetricsResamplingActor, ResamplerConfig __all__ = [ "ChannelRegistry", @@ -15,5 +15,6 @@ "ComponentMetricsResamplingActor", "ConfigManagingActor", "DataSourcingActor", + "ResamplerConfig", "actor", ] diff --git a/src/frequenz/sdk/actor/_resampling.py b/src/frequenz/sdk/actor/_resampling.py index c79e980d0..db9aa3e5b 100644 --- a/src/frequenz/sdk/actor/_resampling.py +++ b/src/frequenz/sdk/actor/_resampling.py @@ -14,12 +14,7 @@ from frequenz.sdk.util.asyncio import cancel_and_await from ..timeseries import Sample -from ..timeseries._resampling import ( - Resampler, - ResamplingError, - ResamplingFunction, - average, -) +from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError from ._channel_registry import ChannelRegistry from ._data_sourcing import ComponentMetricRequest from ._decorator import actor @@ -37,9 +32,7 @@ def __init__( # pylint: disable=too-many-arguments channel_registry: ChannelRegistry, data_sourcing_request_sender: Sender[ComponentMetricRequest], resampling_request_receiver: Receiver[ComponentMetricRequest], - resampling_period_s: float = 0.2, - max_data_age_in_periods: float = 3.0, - resampling_function: ResamplingFunction = average, + config: ResamplerConfig, ) -> None: """Initialize an instance. @@ -51,34 +44,16 @@ def __init__( # pylint: disable=too-many-arguments to subscribe to component metrics. resampling_request_receiver: The receiver to use to receive new resampmling subscription requests. - resampling_period_s: The time it passes between resampled data - should be calculated (in seconds). - max_data_age_in_periods: The maximum age a sample can have to be - considered *relevant* for resampling purposes, expressed in the - number of resampling periods. For exapmle is - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, - then data older than `3*2 = 6` secods will be discarded when - creating a new sample and never passed to the resampling - function. - resampling_function: The function to be applied to the sequence of - *relevant* samples at a given time. The result of the function - is what is sent as the resampled data. + config: The configuration for the resampler. """ self._channel_registry: ChannelRegistry = channel_registry - self._resampling_period_s: float = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods - self._resampling_function: ResamplingFunction = resampling_function self._data_sourcing_request_sender: Sender[ ComponentMetricRequest ] = data_sourcing_request_sender self._resampling_request_receiver: Receiver[ ComponentMetricRequest ] = resampling_request_receiver - self._resampler: Resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=max_data_age_in_periods, - resampling_function=resampling_function, - ) + self._resampler: Resampler = Resampler(config) self._active_req_channels: set[str] = set() async def _subscribe(self, request: ComponentMetricRequest) -> None: diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index d550e3d0f..4abd54f00 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -9,6 +9,7 @@ import logging import math from collections import deque +from dataclasses import dataclass from datetime import datetime, timedelta from typing import AsyncIterator, Callable, Coroutine, Sequence @@ -85,6 +86,34 @@ def average(samples: Sequence[Sample], resampling_period_s: float) -> float: return sum(values) / len(values) +@dataclass(frozen=True) +class ResamplerConfig: + """Resampler configuration.""" + + resampling_period_s: float + """The resapmling period in seconds. + + This is the time it passes between resampled data should be calculated. + """ + + max_data_age_in_periods: float = 3.0 + """The maximum age a sample can have to be considered *relevant* for resampling. + + Expressed in number of resampling periods. For example if + `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, then data + older than 3*2 = 6 secods will be discarded when creating a new sample and + never passed to the resampling function. + """ + + resampling_function: ResamplingFunction = average + """The resampling function. + + This function will be applied to the sequence of relevant samples at + a given time. The result of the function is what is sent as the resampled + value. + """ + + class SourceStoppedError(RuntimeError): """A timeseries stopped producing samples.""" @@ -166,34 +195,24 @@ class Resampler: no way to produce meaningful samples with the available data. """ - def __init__( - self, - *, - resampling_period_s: float, - resampling_function: ResamplingFunction = average, - max_data_age_in_periods: float = 3.0, - ) -> None: + def __init__(self, config: ResamplerConfig) -> None: """Initialize an instance. Args: - resampling_period_s: The time it passes between resampled data - should be calculated (in seconds). - max_data_age_in_periods: The maximum age a sample can have to be - considered *relevant* for resampling purposes, expressed in the - number of resampling periods. For exapmle is - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, - then data older than `3*2 = 6` secods will be discarded when - creating a new sample and never passed to the resampling - function. - resampling_function: The function to be applied to the sequence of - *relevant* samples at a given time. The result of the function - is what is sent as the resampled data. + config: The configuration for the resampler. """ - self._resampling_period_s = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods - self._resampling_function: ResamplingFunction = resampling_function + self._config = config self._resamplers: dict[Source, _StreamingHelper] = {} - self._timer: Timer = Timer(self._resampling_period_s) + self._timer: Timer = Timer(config.resampling_period_s) + + @property + def config(self) -> ResamplerConfig: + """Get the resampler configuration. + + Returns: + The resampler configuration. + """ + return self._config async def stop(self) -> None: """Cancel all receiving tasks.""" @@ -214,15 +233,7 @@ def add_timeseries(self, source: Source, sink: Sink) -> bool: if source in self._resamplers: return False - resampler = _StreamingHelper( - _ResamplingHelper( - resampling_period_s=self._resampling_period_s, - max_data_age_in_periods=self._max_data_age_in_periods, - resampling_function=self._resampling_function, - ), - source, - sink, - ) + resampler = _StreamingHelper(_ResamplingHelper(self._config), source, sink) self._resamplers[source] = resampler return True @@ -288,33 +299,14 @@ class _ResamplingHelper: when calling the `resample()` method. All older samples are discarded. """ - def __init__( - self, - *, - resampling_period_s: float, - max_data_age_in_periods: float, - resampling_function: ResamplingFunction, - ) -> None: + def __init__(self, config: ResamplerConfig) -> None: """Initialize an instance. Args: - resampling_period_s: The time it passes between resampled data - should be calculated (in seconds). - max_data_age_in_periods: The maximum age a sample can have to be - considered *relevant* for resampling purposes, expressed in the - number of resampling periods. For exapmle is - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, - then data older than 3*2 = 6 secods will be discarded when - creating a new sample and never passed to the resampling - function. - resampling_function: The function to be applied to the sequence of - relevant samples at a given time. The result of the function is - what is sent as the resampled data. + config: The configuration for the resampler. """ - self._resampling_period_s = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods + self._config = config self._buffer: deque[Sample] = deque() - self._resampling_function: ResamplingFunction = resampling_function def add_sample(self, sample: Sample) -> None: """Add a new sample to the internal buffer. @@ -361,14 +353,17 @@ def resample(self, timestamp: datetime) -> Sample: have `None` as `value`. """ threshold = timestamp - timedelta( - seconds=self._max_data_age_in_periods * self._resampling_period_s + seconds=self._config.max_data_age_in_periods + * self._config.resampling_period_s ) self._remove_outdated_samples(threshold=threshold) value = ( None if not self._buffer - else self._resampling_function(self._buffer, self._resampling_period_s) + else self._config.resampling_function( + self._buffer, self._config.resampling_period_s + ) ) return Sample(timestamp, value) diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index 102a2d71d..af5439995 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -16,6 +16,7 @@ ChannelRegistry, ComponentMetricRequest, ComponentMetricsResamplingActor, + ResamplerConfig, ) from frequenz.sdk.microgrid.component import ComponentMetricId from frequenz.sdk.timeseries import Sample @@ -123,8 +124,10 @@ async def test_single_request( channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), - resampling_period_s=0.2, - max_data_age_in_periods=2, + config=ResamplerConfig( + resampling_period_s=0.2, + max_data_age_in_periods=2, + ), ) subs_req = ComponentMetricRequest( @@ -167,8 +170,10 @@ async def test_duplicate_request( channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), - resampling_period_s=0.2, - max_data_age_in_periods=2, + config=ResamplerConfig( + resampling_period_s=0.2, + max_data_age_in_periods=2, + ), ) subs_req = ComponentMetricRequest( diff --git a/tests/timeseries/mock_microgrid.py b/tests/timeseries/mock_microgrid.py index eebaaabbc..106ea0b56 100644 --- a/tests/timeseries/mock_microgrid.py +++ b/tests/timeseries/mock_microgrid.py @@ -27,6 +27,7 @@ ComponentMetricRequest, ComponentMetricsResamplingActor, DataSourcingActor, + ResamplerConfig, ) from tests.microgrid import mock_api @@ -236,7 +237,7 @@ async def _init_client_and_actors( channel_registry=channel_registry, data_sourcing_request_sender=data_source_request_sender, resampling_request_receiver=resampling_actor_request_receiver, - resampling_period_s=0.1, + config=ResamplerConfig(resampling_period_s=0.1), ) return (resampling_actor_request_sender, channel_registry) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index b81d8f79d..61b0fdb28 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -17,6 +17,7 @@ from frequenz.sdk.timeseries import Sample from frequenz.sdk.timeseries._resampling import ( Resampler, + ResamplerConfig, ResamplingError, ResamplingFunction, Sink, @@ -90,9 +91,11 @@ async def test_resampling_with_one_window( spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=1.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -180,9 +183,11 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=1.5, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=1.5, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -311,9 +316,11 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -442,9 +449,11 @@ async def test_receiving_stopped_resampling_error( spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -499,9 +508,11 @@ async def test_receiving_resampling_error(fake_time: time_machine.Coordinates) - spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + ) ) class TestException(Exception): From b7c045d68e05803a8197bdcdf176a0e7782bd6cd Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 6 Dec 2022 16:42:50 +0100 Subject: [PATCH 3/5] Add a benchmark for the resampler Signed-off-by: Leandro Lucarella --- benchmarks/timeseries/resampling.py | 51 +++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 benchmarks/timeseries/resampling.py diff --git a/benchmarks/timeseries/resampling.py b/benchmarks/timeseries/resampling.py new file mode 100644 index 000000000..cbf89f6a0 --- /dev/null +++ b/benchmarks/timeseries/resampling.py @@ -0,0 +1,51 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Benchmark resampling.""" + +from datetime import datetime, timedelta, timezone +from timeit import timeit +from typing import Sequence + +from frequenz.sdk.timeseries import Sample +from frequenz.sdk.timeseries._resampling import ResamplerConfig, _ResamplingHelper + + +def nop( # pylint: disable=unused-argument + samples: Sequence[Sample], resampling_period_s: float +) -> float: + """Return 0.0.""" + return 0.0 + + +def _benchmark_resampling_helper(resamples: int, samples: int) -> None: + """Benchmark the resampling helper.""" + helper = _ResamplingHelper( + ResamplerConfig( + resampling_period_s=1.0, + max_data_age_in_periods=3.0, + resampling_function=nop, + ) + ) + now = datetime.now(timezone.utc) + + def _do_work() -> None: + nonlocal now + for _n_resample in range(resamples): + for _n_sample in range(samples): + now = now + timedelta(seconds=1 / samples) + helper.add_sample(Sample(now, 0.0)) + helper.resample(now) + + print(timeit(_do_work, number=5)) + + +def _benchmark() -> None: + for resamples in [10, 100, 1000]: + for samples in [10, 100, 1000]: + print(f"{resamples=} {samples=}") + _benchmark_resampling_helper(resamples, samples) + + +if __name__ == "__main__": + _benchmark() From 91eed972f3bb4a40a54ed10183d0fed9d32260a2 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 7 Dec 2022 15:15:20 +0100 Subject: [PATCH 4/5] Add initial buffer length to the resampler This commit makes the resampler use a proper ring buffer instead of using an unbound buffer that only gets clear up when a resampling is done (which could easily end up in memory issues if the input sampling rate is much higher than the resampling rate). This also improves the performance of the resampling buy an average of 20% (on my local machine, your millage might vary), even when the current implementation now needs to copy the buffer when passed to the resampling function. Here are the results of running benchmarks/timeseries/resampling.py for the old implementation, the new implementation (without doing the bisection and copying) and the new complete implementation with the current fixable inefficiencies: OLD NEW NEW WITH BISECT resamples=10 samples=10 resamples=10 samples=10 resamples=10 samples=10 0.0008896420185919851 0.00062773801619187 0.0008012260077521205 resamples=10 samples=100 resamples=10 samples=100 resamples=10 samples=100 0.007817161997081712 0.005761806009104475 0.006307974021183327 resamples=10 samples=1000 resamples=10 samples=1000 resamples=10 samples=1000 0.07768873398890719 0.05851042701397091 0.0604277040110901 resamples=100 samples=10 resamples=100 samples=10 resamples=100 samples=10 0.008742492995224893 0.0062527229893021286 0.00808265499654226 resamples=100 samples=100 resamples=100 samples=100 resamples=100 samples=100 0.07808284999919124 0.057997508003609255 0.0624066719901748 resamples=100 samples=1000 resamples=100 samples=1000 resamples=100 samples=1000 0.782658567011822 0.5870920980232768 0.6098103950207587 resamples=1000 samples=10 resamples=1000 samples=10 resamples=1000 samples=10 0.08764891701866873 0.062448524025967345 0.07815460601705126 resamples=1000 samples=100 resamples=1000 samples=100 resamples=1000 samples=100 0.78426024899818 0.5858371119829826 0.6357307220168877 resamples=1000 samples=1000 resamples=1000 samples=1000 resamples=1000 samples=1000 7.513815971993608 5.984694316983223 6.42200836900156 Average improvement: 35.3% 19.7% This commit sadly introduces some nasty hack to be able to bisect the buffer, we need to make Sample ordered by timestamp because bisect doesn't support using a key extraction function until Python 3.10. Signed-off-by: Leandro Lucarella --- benchmarks/timeseries/resampling.py | 1 + src/frequenz/sdk/timeseries/_base_types.py | 15 +++-- src/frequenz/sdk/timeseries/_resampling.py | 70 +++++++++++----------- 3 files changed, 47 insertions(+), 39 deletions(-) diff --git a/benchmarks/timeseries/resampling.py b/benchmarks/timeseries/resampling.py index cbf89f6a0..6e85708dc 100644 --- a/benchmarks/timeseries/resampling.py +++ b/benchmarks/timeseries/resampling.py @@ -25,6 +25,7 @@ def _benchmark_resampling_helper(resamples: int, samples: int) -> None: resampling_period_s=1.0, max_data_age_in_periods=3.0, resampling_function=nop, + initial_buffer_len=samples * 3, ) ) now = datetime.now(timezone.utc) diff --git a/src/frequenz/sdk/timeseries/_base_types.py b/src/frequenz/sdk/timeseries/_base_types.py index 36a6e2bd5..6d54a35b3 100644 --- a/src/frequenz/sdk/timeseries/_base_types.py +++ b/src/frequenz/sdk/timeseries/_base_types.py @@ -3,12 +3,16 @@ """Timeseries basic types.""" -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from typing import Optional -@dataclass(frozen=True) +# Ordering by timestamp is a bit arbitrary, and it is not always what might be +# wanted. We are using this order now because usually we need to do binary +# searches on sequences of samples, and the Python `bisect` module doesn't +# support providing a key until Python 3.10. +@dataclass(frozen=True, order=True) class Sample: """A measurement taken at a particular point in time. @@ -17,5 +21,8 @@ class Sample: coherent view on a group of component metrics for a particular timestamp. """ - timestamp: datetime - value: Optional[float] = None + timestamp: datetime = field(compare=True) + """The time when this sample was generated.""" + + value: Optional[float] = field(compare=False, default=None) + """The value of this sample.""" diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 4abd54f00..3d060b220 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -6,8 +6,10 @@ from __future__ import annotations import asyncio +import itertools import logging import math +from bisect import bisect from collections import deque from dataclasses import dataclass from datetime import datetime, timedelta @@ -21,6 +23,15 @@ _logger = logging.Logger(__name__) +DEFAULT_BUFFER_LEN_INIT = 16 +"""Default initial buffer length. + +Buffers will be created initially with this length, but they could grow or +shrink depending on the source characteristics, like sampling rate, to make +sure all the requested past sampling periods can be stored. +""" + + Source = AsyncIterator[Sample] """A source for a timeseries. @@ -113,6 +124,14 @@ class ResamplerConfig: value. """ + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT + """The initial length of the resampling buffer. + + The buffer could grow or shrink depending on the source characteristics, + like sampling rate, to make sure all the requested past sampling periods + can be stored. + """ + class SourceStoppedError(RuntimeError): """A timeseries stopped producing samples.""" @@ -306,7 +325,7 @@ def __init__(self, config: ResamplerConfig) -> None: config: The configuration for the resampler. """ self._config = config - self._buffer: deque[Sample] = deque() + self._buffer: deque[Sample] = deque(maxlen=config.initial_buffer_len) def add_sample(self, sample: Sample) -> None: """Add a new sample to the internal buffer. @@ -316,30 +335,6 @@ def add_sample(self, sample: Sample) -> None: """ self._buffer.append(sample) - def _remove_outdated_samples(self, threshold: datetime) -> None: - """Remove samples that are older than the provided time threshold. - - It is assumed that items in the buffer are in a sorted order (ascending order - by timestamp). - - The removal works by traversing the buffer starting from the oldest sample - (smallest timestamp) and comparing sample's timestamp with the threshold. - If the sample's threshold is smaller than `threshold`, it means that the - sample is outdated and it is removed from the buffer. This continues until - the first sample that is with timestamp greater or equal to `threshold` is - encountered, then buffer is considered up to date. - - Args: - threshold: samples whose timestamp is older than the threshold are - considered outdated and should be remove from the buffer - """ - while self._buffer: - sample: Sample = self._buffer[0] - if sample.timestamp > threshold: - return - - self._buffer.popleft() - def resample(self, timestamp: datetime) -> Sample: """Generate a new sample based on all the current *relevant* samples. @@ -352,18 +347,23 @@ def resample(self, timestamp: datetime) -> Sample: If there are no *relevant* samples, then the new sample will have `None` as `value`. """ - threshold = timestamp - timedelta( - seconds=self._config.max_data_age_in_periods - * self._config.resampling_period_s + conf = self._config + minimum_relevant_timestamp = timestamp - timedelta( + seconds=conf.resampling_period_s * conf.max_data_age_in_periods ) - self._remove_outdated_samples(threshold=threshold) - + # We need to pass a dummy Sample to bisect because it only support + # specifying a key extraction function in Python 3.10, so we need to + # compare samples at the moment. + cut_index = bisect(self._buffer, Sample(minimum_relevant_timestamp, None)) + # pylint: disable=fixme + # FIXME: This is far from efficient, but we don't want to start new + # ring buffer implementation here that uses a list to overcome the + # deque limitation of not being able to get slices + relevant_samples = list(itertools.islice(self._buffer, cut_index, None)) value = ( - None - if not self._buffer - else self._config.resampling_function( - self._buffer, self._config.resampling_period_s - ) + conf.resampling_function(relevant_samples, conf.resampling_period_s) + if relevant_samples + else None ) return Sample(timestamp, value) From 55916591a2659690b88639fa05684e5d8b8136e3 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Mon, 12 Dec 2022 15:09:35 +0100 Subject: [PATCH 5/5] Add a custom resampling buffer The resampling buffer uses a list as a backing container to implement a push-only ring buffer that can be also easily sliced. Signed-off-by: Leandro Lucarella --- benchmarks/timeseries/resampling.py | 5 +- examples/resampling.py | 3 +- .../sdk/timeseries/_resampling/__init__.py | 21 ++ .../sdk/timeseries/_resampling/_buffer.py | 262 ++++++++++++++++++ .../_resampler.py} | 22 +- tests/timeseries/resampling/__init__.py | 4 + tests/timeseries/resampling/test_buffer.py | 177 ++++++++++++ tests/timeseries/test_resampling.py | 3 +- 8 files changed, 478 insertions(+), 19 deletions(-) create mode 100644 src/frequenz/sdk/timeseries/_resampling/__init__.py create mode 100644 src/frequenz/sdk/timeseries/_resampling/_buffer.py rename src/frequenz/sdk/timeseries/{_resampling.py => _resampling/_resampler.py} (95%) create mode 100644 tests/timeseries/resampling/__init__.py create mode 100644 tests/timeseries/resampling/test_buffer.py diff --git a/benchmarks/timeseries/resampling.py b/benchmarks/timeseries/resampling.py index 6e85708dc..ee7ad9cd6 100644 --- a/benchmarks/timeseries/resampling.py +++ b/benchmarks/timeseries/resampling.py @@ -8,7 +8,10 @@ from typing import Sequence from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._resampling import ResamplerConfig, _ResamplingHelper +from frequenz.sdk.timeseries._resampling._resampler import ( + ResamplerConfig, + _ResamplingHelper, +) def nop( # pylint: disable=unused-argument diff --git a/examples/resampling.py b/examples/resampling.py index 6412a240d..f045c2049 100644 --- a/examples/resampling.py +++ b/examples/resampling.py @@ -18,7 +18,8 @@ ) from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig, Sink, Source +from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig +from frequenz.sdk.timeseries._resampling._resampler import Sink, Source HOST = "microgrid.sandbox.api.frequenz.io" PORT = 61060 diff --git a/src/frequenz/sdk/timeseries/_resampling/__init__.py b/src/frequenz/sdk/timeseries/_resampling/__init__.py new file mode 100644 index 000000000..11ef0349b --- /dev/null +++ b/src/frequenz/sdk/timeseries/_resampling/__init__.py @@ -0,0 +1,21 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Timeseries resampling.""" + + +from ._resampler import ( + Resampler, + ResamplerConfig, + ResamplingError, + ResamplingFunction, + SourceStoppedError, +) + +__all__ = [ + "Resampler", + "ResamplerConfig", + "ResamplingError", + "ResamplingFunction", + "SourceStoppedError", +] diff --git a/src/frequenz/sdk/timeseries/_resampling/_buffer.py b/src/frequenz/sdk/timeseries/_resampling/_buffer.py new file mode 100644 index 000000000..8021cf331 --- /dev/null +++ b/src/frequenz/sdk/timeseries/_resampling/_buffer.py @@ -0,0 +1,262 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Timeseries resampler buffer handling.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Sequence, TypeVar, overload + +T = TypeVar("T") + + +@dataclass +class _Uninitialized: + """A sentinel class to mark uninitialized data in the buffer.""" + + +_UNINITIALIZED = _Uninitialized() +"""A uninitialized sentinel instance.""" + + +class _BufferView(Sequence[T]): + """A read-only view into a Buffer. + + This is a ring buffer with the following structure: + + * It is a fixed size buffer. + * If the buffer isn't full, elements not yet filled up are represented with + a `_UNINITIALIZED` instance. + * The `tail` pointer points to the first element in the view. + * The `head` pointer points past the last element in the view. + * The ring buffer can, of course, wrap around, so `head` can be smaller than `tail`. + * When `head` == `tail`, the view is empty, unless `_is_full` is `True`, in + which case the view covers the whole buffer. + """ + + def __init__( + self, *, buffer: list[_Uninitialized | T], full: bool, tail: int, head: int + ) -> None: + """Create an instance. + + Args: + buffer: The internal buffer that needs to be viewed. + full: Whether this is a view on a full buffer. + tail: The index of the first element of the view. + head: The index of the last element of the view. + """ + super().__init__() + self._buffer: list[_Uninitialized | T] = buffer + self._is_full: bool = full + self._tail: int = tail # Points to the oldest element in the buffer + self._head: int = head # Points to the newest element in the buffer + + @property + def capacity(self) -> int: + """Return the capacity of the internal buffer. + + Returns: + The capacity of the internal buffer. + """ + return len(self._buffer) + + def _adjust(self, position: int, *, offset: int = 0) -> int: + """Adjust a position to point to a valid index in the buffer. + + An optional `offset` can be passed if the position needs to be also + updated using some offset. + + Args: + position: The position index to adjust. + offset: The offset to use to calculate the new position. + + Returns: + The adjusted position index. + """ + return (position + offset) % self.capacity + + def _index(self, /, index: int) -> T: + """Get the nth element in this view. + + Args: + index: The index of the element to get. + + Returns: + The element in the nth position. + """ + if index < 0: + index += len(self) + if index < 0: + raise IndexError("Buffer index out of range") + if index >= len(self): + raise IndexError("Buffer index out of range") + index = self._adjust(self._tail, offset=index) + item = self._buffer[index] + assert not isinstance(item, _Uninitialized) + return item + + def _slice(self, /, slice_: slice) -> Sequence[T]: + """Get a slice on this view. + + Args: + slice_: The parameters of the slice to get. + + Returns: + The slice on this view. + """ + start, stop, step = slice_.indices(len(self)) + # This is a bit expensive, but making all the calculation each time + # an item is accessed would also be expensive. This way the slice + # might be expensive to construct but then is super cheap to use, + # which should probably be the most common sense. We also avoid + # having to implement a lot of weird corner cases. + if step != 1: + new_slice: list[T] = [] + for i in range(start, stop, step): + new_slice.append(self[i]) + return new_slice + + # If we have a step of 1, then things are very simple, and we can + # use another view on the same buffer. This is also probably the + # most common slicing case. + + # If the requested size is empty, then just return an empty tuple + if start >= stop: + return () + + tail = self._adjust(self._tail, offset=start) + head = self._adjust(self._tail, offset=stop) + slice_is_full = stop - start == self.capacity + return _BufferView( + buffer=self._buffer, full=slice_is_full, head=head, tail=tail + ) + + @overload + def __getitem__(self, index: int) -> T: + ... + + @overload + def __getitem__(self, index: slice) -> Sequence[T]: + ... + + def __getitem__(self, index: int | slice) -> T | Sequence[T]: + """Get an item or slice on this view. + + Args: + index: The index of the element or parameters of the slice to get. + + Returns: + The element or slice on this view. + """ + if isinstance(index, slice): + return self._slice(index) + + return self._index(index) + + def __len__(self) -> int: + """Get the length of this view. + + Returns: + The length of this view. + """ + if self._is_full: + return self.capacity + if self._head == self._tail: + return 0 + offset = self.capacity if self._head < self._tail else 0 + return offset + self._head - self._tail + + def __repr__(self) -> str: + """Get the string representation of this view. + + Returns: + The string representation of this view. + """ + items = [] + if self._tail != self._head or self._is_full: + tail = self._tail + for _ in range(len(self)): + items.append(self._buffer[tail]) + tail = self._adjust(tail, offset=1) + return f"{self.__class__.__name__}({items})" + + def _debug_repr(self) -> str: + """Get the debug string representation of this view. + + Returns: + The debug string representation of this view. + """ + return ( + f"{self.__class__.__name__}(buffer={self._buffer!r}, " + "full={self._is_full!r}, tail={self._tail!r}, head={self._head!r})" + ) + + def __eq__(self, __o: object) -> bool: + """Compare this view to another object. + + Returns: + `True` if the other object is also a `Sequence` and all the + elements compare equal, `False` otherwise. + """ + if isinstance(__o, Sequence): + len_self = len(self) + if len_self != len(__o): + return False + for i in range(len_self): + if self[i] != __o[i]: + return False + return True + return super().__eq__(__o) + + +class Buffer(_BufferView[T]): + """A push-only, fixed-size ring buffer. + + Elements can be `push`ed to the buffer, when the buffer gets full, newer + elements will replace older elements. The buffer can be `clear`ed. + """ + + def __init__(self, /, capacity: int, initial_values: Sequence[T] = ()) -> None: + """Create an instance. + + Args: + capacity: The capacity of the buffer. + initial_values: The initial values to fill the buffer with. If it + has more items than the capacity, then only the last items will + be added. + """ + if capacity <= 0: + raise ValueError("The buffer capacity must be larger than 0") + if len(initial_values) > capacity: + initial_values = initial_values[-capacity:] + buffer: list[_Uninitialized | T] = list(initial_values) + is_full = False + if len(initial_values) < capacity: + buffer += [_UNINITIALIZED] * (capacity - len(initial_values)) + head = len(initial_values) + else: + is_full = True + head = 0 + super().__init__(buffer=buffer, full=is_full, tail=0, head=head) + + def clear(self) -> None: + """Clear the buffer.""" + self._head = 0 + self._tail = 0 + self._is_full = False + + def push(self, element: T) -> None: + """Push a new element to the buffer. + + If the buffer is full, the oldest element will be dropped. + + Args: + element: Element to push. + """ + self._buffer[self._head] = element + self._head = self._adjust(self._head, offset=1) + if self._is_full: + self._tail = self._adjust(self._tail, offset=1) + if self._head == self._tail and not self._is_full: + self._is_full = True diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py similarity index 95% rename from src/frequenz/sdk/timeseries/_resampling.py rename to src/frequenz/sdk/timeseries/_resampling/_resampler.py index 3d060b220..1f4240be3 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -6,22 +6,17 @@ from __future__ import annotations import asyncio -import itertools -import logging import math from bisect import bisect -from collections import deque from dataclasses import dataclass from datetime import datetime, timedelta from typing import AsyncIterator, Callable, Coroutine, Sequence from frequenz.channels.util import Timer -from ..util.asyncio import cancel_and_await -from . import Sample - -_logger = logging.Logger(__name__) - +from ...util.asyncio import cancel_and_await +from .. import Sample +from ._buffer import Buffer DEFAULT_BUFFER_LEN_INIT = 16 """Default initial buffer length. @@ -325,7 +320,7 @@ def __init__(self, config: ResamplerConfig) -> None: config: The configuration for the resampler. """ self._config = config - self._buffer: deque[Sample] = deque(maxlen=config.initial_buffer_len) + self._buffer: Buffer = Buffer(config.initial_buffer_len) def add_sample(self, sample: Sample) -> None: """Add a new sample to the internal buffer. @@ -333,7 +328,7 @@ def add_sample(self, sample: Sample) -> None: Args: sample: The sample to be added to the buffer. """ - self._buffer.append(sample) + self._buffer.push(sample) def resample(self, timestamp: datetime) -> Sample: """Generate a new sample based on all the current *relevant* samples. @@ -356,10 +351,7 @@ def resample(self, timestamp: datetime) -> Sample: # compare samples at the moment. cut_index = bisect(self._buffer, Sample(minimum_relevant_timestamp, None)) # pylint: disable=fixme - # FIXME: This is far from efficient, but we don't want to start new - # ring buffer implementation here that uses a list to overcome the - # deque limitation of not being able to get slices - relevant_samples = list(itertools.islice(self._buffer, cut_index, None)) + relevant_samples = self._buffer[cut_index:] value = ( conf.resampling_function(relevant_samples, conf.resampling_period_s) if relevant_samples @@ -387,7 +379,7 @@ def __init__( self._helper: _ResamplingHelper = helper self._source: Source = source self._sink: Sink = sink - self._receiving_task: asyncio.Task = asyncio.create_task( + self._receiving_task: asyncio.Task[None] = asyncio.create_task( self._receive_samples() ) diff --git a/tests/timeseries/resampling/__init__.py b/tests/timeseries/resampling/__init__.py new file mode 100644 index 000000000..3e762810b --- /dev/null +++ b/tests/timeseries/resampling/__init__.py @@ -0,0 +1,4 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Frequenz Python SDK Tests.""" diff --git a/tests/timeseries/resampling/test_buffer.py b/tests/timeseries/resampling/test_buffer.py new file mode 100644 index 000000000..07f6e89bd --- /dev/null +++ b/tests/timeseries/resampling/test_buffer.py @@ -0,0 +1,177 @@ +# tests/timeseries/test_resampling.py:93: License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Tests for the resampling buffer.""" + +from __future__ import annotations + +from typing import Optional + +import pytest + +from frequenz.sdk.timeseries._resampling._buffer import Buffer + + +def test_buffer_init_zero_capacity() -> None: + """Test a buffer with zero capacity can't be created.""" + with pytest.raises(ValueError): + _ = Buffer[int](0) + + +def test_buffer_init_empty() -> None: + """Test an empty buffer is empty and can be pushed to.""" + buffer = Buffer[int](2) + + assert not buffer + assert buffer == [] # pylint: disable=use-implicit-booleaness-not-comparison + assert len(buffer) == 0 + for i in range(-5, 5): + with pytest.raises(IndexError): + _ = buffer[i] + + buffer.push(11) + assert buffer == [11] + + buffer.push(15) + assert buffer == [11, 15] + + buffer.push(25) + assert buffer == [15, 25] + + +def test_buffer_init_full() -> None: + """Test a full buffer can be created, pushed to, and drops old values.""" + init = (3, 2, 6, 10, 9) + buffer = Buffer[int](5, init) + + assert buffer == init + assert len(buffer) == len(init) + for i in range(5): + assert buffer[i] == init[i] + for i in range(-1, -6, -1): + assert buffer[i] == init[i] + with pytest.raises(IndexError): + _ = buffer[-6] + with pytest.raises(IndexError): + _ = buffer[5] + + buffer.push(11) + assert buffer == [2, 6, 10, 9, 11] + assert buffer[4] == 11 + with pytest.raises(IndexError): + _ = buffer[-6] + with pytest.raises(IndexError): + _ = buffer[5] + + +def test_buffer_init_partial() -> None: + """Test a partially initialized buffer. + + Test it can be created, pushed to, grows and eventuall drops old values. + """ + init = (2, 9) + buffer = Buffer[int](4, init) + + assert buffer == init + assert len(buffer) == len(init) + with pytest.raises(IndexError): + _ = buffer[-3] + with pytest.raises(IndexError): + _ = buffer[2] + for i in range(2): + assert buffer[i] == init[i] + for i in range(-1, -3, -1): + assert buffer[i] == init[i] + + buffer.push(11) + assert buffer == [2, 9, 11] + + buffer.push(15) + assert buffer == [2, 9, 11, 15] + + buffer.push(25) + assert buffer == [9, 11, 15, 25] + + +def test_buffer_push() -> None: + """Test a buffer can be pushed to.""" + buffer = Buffer[int](2) + + with pytest.raises(IndexError): + _ = buffer[0] + with pytest.raises(IndexError): + _ = buffer[1] + with pytest.raises(IndexError): + _ = buffer[2] + + buffer.push(10) + assert len(buffer) == 1 + assert buffer == [10] + assert buffer[0] == 10 + with pytest.raises(IndexError): + _ = buffer[2] + + buffer.push(20) + assert len(buffer) == 2 + assert buffer == [10, 20] + assert buffer[0] == 10 + assert buffer[1] == 20 + with pytest.raises(IndexError): + _ = buffer[2] + + buffer.push(5) + assert len(buffer) == 2 + assert buffer == [20, 5] + assert buffer[0] == 20 + assert buffer[1] == 5 + with pytest.raises(IndexError): + _ = buffer[2] + + +def test_buffer_clear() -> None: + """Test a buffer can be cleared correctly.""" + buffer = Buffer[int](2, (1, 2)) + + buffer.clear() + assert not buffer + assert buffer == [] # pylint: disable=use-implicit-booleaness-not-comparison + assert len(buffer) == 0 + with pytest.raises(IndexError): + _ = buffer[0] + with pytest.raises(IndexError): + _ = buffer[1] + with pytest.raises(IndexError): + _ = buffer[2] + + +@pytest.mark.parametrize("step", [None, 1, 2, 5, -5, -2, -1]) +@pytest.mark.parametrize("stop", [0, None, 1, 3, 5, -5, -3, -1]) +@pytest.mark.parametrize("start", [0, None, 1, 3, 5, -5, -3, -1]) +def test_slice(start: Optional[int], stop: Optional[int], step: Optional[int]) -> None: + """Test slicing operations.""" + init = (3, 2, 6, 10, 9) + buffer = Buffer[int](5, init) + + init_slice = init[start:stop:step] + slice_ = buffer[start:stop:step] + + assert len(slice_) == len(init_slice) + assert list(slice_) == list(init_slice) + for i in range(3): + with pytest.raises(IndexError): + _ = init_slice[-len(init_slice) - i - 1] + with pytest.raises(IndexError): + # For some reason pylint thinks slice_ is unsubscriptable, but the + # type is properly inferred as Sequence[int], so it looks like + # a pylint bug + # pylint: disable=unsubscriptable-object + _ = slice_[-len(init_slice) - i - 1] + with pytest.raises(IndexError): + # pylint: disable=unsubscriptable-object + _ = slice_[len(init_slice) + i] + for i in range(len(init_slice)): # pylint: disable=consider-using-enumerate + # pylint: disable=unsubscriptable-object + assert slice_[i] == init_slice[i] + for i in range(-1, -len(init_slice) - 1, -1): + # pylint: disable=unsubscriptable-object + assert slice_[i] == init_slice[i] diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 61b0fdb28..55014b49d 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -20,10 +20,9 @@ ResamplerConfig, ResamplingError, ResamplingFunction, - Sink, - Source, SourceStoppedError, ) +from frequenz.sdk.timeseries._resampling._resampler import Sink, Source from ..utils import a_sequence