Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a custom resampling buffer #130

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 55 additions & 0 deletions benchmarks/timeseries/resampling.py
@@ -0,0 +1,55 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""Benchmark resampling."""

from datetime import datetime, timedelta, timezone
from timeit import timeit
from typing import Sequence

from frequenz.sdk.timeseries import Sample
from frequenz.sdk.timeseries._resampling._resampler import (
ResamplerConfig,
_ResamplingHelper,
)


def nop( # pylint: disable=unused-argument
samples: Sequence[Sample], resampling_period_s: float
) -> float:
"""Return 0.0."""
return 0.0


def _benchmark_resampling_helper(resamples: int, samples: int) -> None:
"""Benchmark the resampling helper."""
helper = _ResamplingHelper(
ResamplerConfig(
resampling_period_s=1.0,
max_data_age_in_periods=3.0,
resampling_function=nop,
initial_buffer_len=samples * 3,
)
)
now = datetime.now(timezone.utc)

def _do_work() -> None:
nonlocal now
for _n_resample in range(resamples):
for _n_sample in range(samples):
now = now + timedelta(seconds=1 / samples)
helper.add_sample(Sample(now, 0.0))
helper.resample(now)

print(timeit(_do_work, number=5))


def _benchmark() -> None:
for resamples in [10, 100, 1000]:
for samples in [10, 100, 1000]:
print(f"{resamples=} {samples=}")
_benchmark_resampling_helper(resamples, samples)


if __name__ == "__main__":
_benchmark()
7 changes: 4 additions & 3 deletions examples/resampling.py
Expand Up @@ -18,7 +18,8 @@
)
from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId
from frequenz.sdk.timeseries import Sample
from frequenz.sdk.timeseries._resampling import Resampler, Sink, Source
from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig
from frequenz.sdk.timeseries._resampling._resampler import Sink, Source

HOST = "microgrid.sandbox.api.frequenz.io"
PORT = 61060
Expand Down Expand Up @@ -65,7 +66,7 @@ async def run() -> None: # pylint: disable=too-many-locals
channel_registry=channel_registry,
data_sourcing_request_sender=data_source_request_sender,
resampling_request_receiver=resampling_request_receiver,
resampling_period_s=1,
config=ResamplerConfig(resampling_period_s=1),
)

components = await microgrid.get().api_client.components()
Expand Down Expand Up @@ -104,7 +105,7 @@ async def run() -> None: # pylint: disable=too-many-locals
# Create a channel to calculate an average for all the data
average_chan = Broadcast[Sample]("average")

second_stage_resampler = Resampler(resampling_period_s=3.0)
second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0))
second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample)

average_sender = average_chan.new_sender()
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"sympy >= 1.10.1, < 2",
"toml >= 0.10",
"tqdm >= 4.38.0, < 5",
"typing_extensions >= 4.4.0, < 5",
"watchfiles >= 0.15.0",
]
dynamic = [ "version" ]
Expand Down
3 changes: 2 additions & 1 deletion src/frequenz/sdk/actor/__init__.py
Expand Up @@ -7,13 +7,14 @@
from ._config_managing import ConfigManagingActor
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
from ._decorator import actor
from ._resampling import ComponentMetricsResamplingActor
from ._resampling import ComponentMetricsResamplingActor, ResamplerConfig

__all__ = [
"ChannelRegistry",
"ComponentMetricRequest",
"ComponentMetricsResamplingActor",
"ConfigManagingActor",
"DataSourcingActor",
"ResamplerConfig",
"actor",
]
33 changes: 4 additions & 29 deletions src/frequenz/sdk/actor/_resampling.py
Expand Up @@ -14,12 +14,7 @@
from frequenz.sdk.util.asyncio import cancel_and_await

from ..timeseries import Sample
from ..timeseries._resampling import (
Resampler,
ResamplingError,
ResamplingFunction,
average,
)
from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError
from ._channel_registry import ChannelRegistry
from ._data_sourcing import ComponentMetricRequest
from ._decorator import actor
Expand All @@ -37,9 +32,7 @@ def __init__( # pylint: disable=too-many-arguments
channel_registry: ChannelRegistry,
data_sourcing_request_sender: Sender[ComponentMetricRequest],
resampling_request_receiver: Receiver[ComponentMetricRequest],
resampling_period_s: float = 0.2,
max_data_age_in_periods: float = 3.0,
resampling_function: ResamplingFunction = average,
config: ResamplerConfig,
) -> None:
"""Initialize an instance.

Expand All @@ -51,34 +44,16 @@ def __init__( # pylint: disable=too-many-arguments
to subscribe to component metrics.
resampling_request_receiver: The receiver to use to receive new
resampmling subscription requests.
resampling_period_s: The time it passes between resampled data
should be calculated (in seconds).
max_data_age_in_periods: The maximum age a sample can have to be
considered *relevant* for resampling purposes, expressed in the
number of resampling periods. For exapmle is
`resampling_period_s` is 3 and `max_data_age_in_periods` is 2,
then data older than `3*2 = 6` secods will be discarded when
creating a new sample and never passed to the resampling
function.
resampling_function: The function to be applied to the sequence of
*relevant* samples at a given time. The result of the function
is what is sent as the resampled data.
config: The configuration for the resampler.
"""
self._channel_registry: ChannelRegistry = channel_registry
self._resampling_period_s: float = resampling_period_s
self._max_data_age_in_periods: float = max_data_age_in_periods
self._resampling_function: ResamplingFunction = resampling_function
self._data_sourcing_request_sender: Sender[
ComponentMetricRequest
] = data_sourcing_request_sender
self._resampling_request_receiver: Receiver[
ComponentMetricRequest
] = resampling_request_receiver
self._resampler: Resampler = Resampler(
resampling_period_s=resampling_period_s,
max_data_age_in_periods=max_data_age_in_periods,
resampling_function=resampling_function,
)
self._resampler: Resampler = Resampler(config)
self._active_req_channels: set[str] = set()

async def _subscribe(self, request: ComponentMetricRequest) -> None:
Expand Down
15 changes: 11 additions & 4 deletions src/frequenz/sdk/timeseries/_base_types.py
Expand Up @@ -3,12 +3,16 @@

"""Timeseries basic types."""

from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional


@dataclass(frozen=True)
# Ordering by timestamp is a bit arbitrary, and it is not always what might be
# wanted. We are using this order now because usually we need to do binary
# searches on sequences of samples, and the Python `bisect` module doesn't
# support providing a key until Python 3.10.
@dataclass(frozen=True, order=True)
class Sample:
"""A measurement taken at a particular point in time.

Expand All @@ -17,5 +21,8 @@ class Sample:
coherent view on a group of component metrics for a particular timestamp.
"""

timestamp: datetime
value: Optional[float] = None
timestamp: datetime = field(compare=True)
"""The time when this sample was generated."""

value: Optional[float] = field(compare=False, default=None)
"""The value of this sample."""
21 changes: 21 additions & 0 deletions src/frequenz/sdk/timeseries/_resampling/__init__.py
@@ -0,0 +1,21 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""Timeseries resampling."""


from ._resampler import (
Resampler,
ResamplerConfig,
ResamplingError,
ResamplingFunction,
SourceStoppedError,
)

__all__ = [
"Resampler",
"ResamplerConfig",
"ResamplingError",
"ResamplingFunction",
"SourceStoppedError",
]