Implement EventResampler for Cascaded Resampling#1372
Implement EventResampler for Cascaded Resampling#1372malteschaaf wants to merge 5 commits intofrequenz-floss:v1.x.xfrom
Conversation
Extract window emission logic from `resample()` into dedicated `_emit_window()` method to allow code sharing between Timer-based and Event-driven resampler implementations. Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Add ability for `StreamingHelper` to notify external consumers when samples arrive via a callback function. This enables event-driven resampler implementations to receive samples without polling internal buffers. Changes: - Added `_sample_callback` attribute to store the callback function - Added `register_sample_callback()` method to register an async callback - Modified `_receive_samples()` to invoke the callback when a sample is added to the buffer This mechanism is used by `EventResampler` to implement event-driven sample processing instead of timer-based polling. Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Add `EventResampler` class that uses event-driven window management instead of timer-based intervals. This solves data loss issues when cascading resamplers. Problem: When cascading Timer-based resamplers (e.g., 1s → 10s) with `align_to=UNIX_EPOCH`, samples can be lost at window boundaries due to timing synchronization. Solution: EventResampler opens/closes windows based on sample arrival timestamps instead of fixed intervals, ensuring no data loss at boundaries. Changes: - Added `EventResampler` class that inherits from `Resampler` - Windows are emitted when a sample arrives with `timestamp >= window_end` - Maintains window alignment through simple addition of `resampling_period` - Uses sample callback mechanism from `StreamingHelper` for event-driven processing `EventResampler` is optimized for cascaded resampling and should not be used directly with raw, irregular data. Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Add test suite for `EventResampler` covering window initialization, boundary conditions, and alignment behavior. Tests are parametrized to verify correct behavior with and without `align_to` configuration. Changes: - Added tests for `EventResampler` initialization and window end calculation - Added tests for sample processing before, at, and after window boundaries - Added tests for correct behavior when samples cross multiple windows - Added tests verifying window alignment is maintained through simple addition - Added key test demonstrating no data loss at window boundaries Tests use parametrized fixtures to cover both aligned and non-aligned window scenarios, ensuring the event-driven window management works correctly in all cases. Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
| """Cancel the receiving task.""" | ||
| await cancel_and_await(self._receiving_task) | ||
|
|
||
| def register_sample_callback( |
There was a problem hiding this comment.
To me, register would suggest that multiple callbacks can exist. How about set instead?
| def __init__(self, config: ResamplerConfig) -> None: | ||
| """Initialize EventResampler. | ||
|
|
||
| This does not call super().__init__() to avoid starting any timers |
There was a problem hiding this comment.
self._timer is not used at all, right? I think it would be cleaner then to extract a common base class from the existing Resampler and EventResampler.
| try: | ||
| while True: | ||
| sample = await self._sample_queue.get() | ||
| emmitted = await self._process_sample(sample) |
There was a problem hiding this comment.
emitted
A few more occurrences below.
| Returns: | ||
| True if at least one window was emitted, False otherwise. | ||
| """ | ||
| async with self._window_lock: |
There was a problem hiding this comment.
self._window_end is only written to in __init__ and here, correct? So the lock is needed to protect concurrent calls to _process_sample? Just trying to understand it :)
| @pytest.fixture | ||
| def now() -> datetime: | ||
| """Fixture providing a fixed current time for testing.""" | ||
| return datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc) |
There was a problem hiding this comment.
Could it make sense to use something like freezegun in these tests? Or that fake_time fixture used elsewhere in this repo?
There was a problem hiding this comment.
We use async-solipsism and time-machine. There should be test in there using them.
llucax
left a comment
There was a problem hiding this comment.
Unfortunately I don't think this is the way to go. To me it looks too hacky to use/inherit from the resampler to implement this by adding the callback function. If we are going for this, we'll probably need to rethink the whole resampler and probably write the new one from scratch. For example, the whole time-jump logic is not needed if we just receive samples and trigger a resampling when the fist sample with timestamp >= window_end arrives.
There are other complexities introduced that feel unnecessary, like adding locks and queues, these are not good signs.
I also asked AI to analyze the PR, and it found some important bugs. I didn't validate them yet (because I think the reasons above are enough to try to find another approach), but I will leave them here in case you want to look at them (I wouldn't, because I think we need to find another approach even if those bugs aren't real).
AI review
The biggest blocker is a multi-timeseries correctness bug. EventResampler keeps one global _window_end and one global queue for all sources. Every _StreamingHelper registers the same callback, the queue stores only Sample objects without source identity, and when any queued sample crosses the boundary, _process_sample() calls _emit_window() for all registered helpers. In practice, the first source whose t=10 sample arrives will close the t=10 window for every other source too, so a slightly slower source can still miss its boundary sample. That recreates the same class of boundary-loss bug this PR is trying to solve, just at the “multiple sources in one resampler” level. ([GitHub]1)
A second blocker is the lifecycle/remove bug. remove_timeseries() still only deletes the dict entry and does not stop the receiving task; the PR description explicitly calls out that removed sources keep reading, buffering, and invoking callbacks, and that the queue can grow unbounded for EventResampler. With the current shared-queue design, that is worse than a leak: removed sources can continue to enqueue samples that advance the global window for the remaining active sources. I would not merge this without cancelling/unregistering the helper on removal. ([GitHub]2)
There is also a public API/docs mismatch. The PR description shows from frequenz.sdk.timeseries import EventResampler, but the package frequenz.sdk.timeseries.__init__ on the PR branch still does not export EventResampler; the new tests also import the private module path directly. So either the advertised import path is broken, or this class is intentionally still private and the PR text/release notes are overstating availability. Add the public export plus a public-import test, or document it as private/internal for now. ([GitHub]2)
I also see an edge-case semantic gap around idle/end-of-stream behavior. The event-driven loop only emits windows from _process_sample() after a later sample arrives, and the class doc explicitly says missed windows are emitted “when data resumes.” I do not see a path that flushes the final pending window on source stop, shutdown, or long idle periods without a future sample. That is a behavior difference from the timer-based resampler and should be either handled explicitly or clearly documented and tested. This is an inference from the control flow, not something the PR text states directly. ([GitHub]3)
On security, I do not see injection/auth/crypto problems here. The main issue is resource exhaustion: _sample_queue is an unbounded asyncio.Queue, and the PR itself already notes unbounded growth after remove_timeseries(). A noisy, stalled, or orphaned source can therefore consume memory without limit. ([GitHub]1)
For breaking changes, I did not find an existing public API signature change, removed/renamed public method, return-type break, DB schema change, or config-surface break in the files changed. The main API problem is with the new feature’s availability and import path, not with existing consumers. ([GitHub]1)
Release notes are present, but not quite right. It is good that RELEASE_NOTES.md was updated, but two of the entries are private implementation details: StreamingHelper.register_sample_callback() and Resampler._emit_window(). Those do not look like user-facing release-note items. I would keep the EventResampler note, add caveats about intended usage/limitations, and drop or re-scope the private internals. ([GitHub]1)
Documentation looks incomplete. The PR changes only four files: release notes, two implementation files, and tests. I do not see README/public API doc updates, and the only user-facing usage example I can find is the PR description itself. For a feature presented as a new resampler users can adopt, that is thin. ([GitHub]1)
Testing is the other major gap. The new tests are good for boundary arithmetic, but they mostly unit-test _process_sample() with _emit_window() mocked out. They do not cover the highest-risk integration paths: multiple active sources, add_timeseries()/_StreamingHelper end-to-end behavior, remove_timeseries(), source stop, long idle gaps, one-shot semantics, backpressure/queue growth, error propagation, or the public import path. So the current tests miss the areas most likely to fail in production. ([GitHub]4)
My must-fix list before merge would be: fix the multi-source window-closing logic, fix helper cancellation/unregistration on removal, decide whether EventResampler is public and wire exports/docs accordingly, and add integration tests for multi-source and lifecycle behavior.
Problem
When cascading Timer-based resamplers (e.g., 1s → 10s) with
align_to=UNIX_EPOCH, samples can be lost at window boundaries due to timing synchronization issues.Example:
This occurs because both resamplers use independent timers aligned to the same epoch, but their tick times are not synchronized.
Solution
EventResampleruses event-driven window management instead of fixed timer intervals. Windows are emitted when a sample arrives with a timestamp >= current window_end, not on a timer schedule.Benefits:
Implementation
Core Components
EventResampler class: Event-driven resampler inheriting from
Resamplerresample()with event-driven loop_process_sample()for window state management_calculate_window_end()(simplified for event-driven scenario)Sample callback mechanism: Added to
StreamingHelperregister_sample_callback()methodRefactored
_emit_window(): Extracted fromResampler.resample()Design Decision: asyncio.Queue + Callbacks
After evaluating multiple approaches (polling, direct channels, event signals, task-based multiplexing), I chose
asyncio.Queuewith callbacks because:await queue.get()), not polling.Queue[Sample[Quantity]]ensures type correctness.asyncio.Queueis designed specifically for this use case.Changes Made
StreamingHelper
_sample_callbackattribute to store callback functionregister_sample_callback()method to register async callback_receive_samples()to invoke callback when sample arrivesResampler
_emit_window()method fromresample()resample()to delegate window emission to_emit_window()EventResampler (New)
_calculate_window_end()(no warm-up period needed)Tests
Open Question: Task Cancellation in
remove_timeseries()Observation: When
remove_timeseries()is called, the_receiving_taskcontinues running in the background. It will:Current behavior:
Impact:
Question for reviewers:
Should
remove_timeseries()cancel the receiving task for bothTimerResamplerandEventResampler? If yes, we need to either:remove_timeseries()async and callawait helper.stop()cancel_receiving()method to_StreamingHelperThis should be addressed as a separate issue for both resamplers, not just EventResampler.
Backward Compatibility
ResamplerAPI unchangedStreamingHelperadditions are backward compatible (callback defaults to None)EventResampleris opt-in (users must explicitly use it)Usage
Testing
align_to