Skip to content

Commit

Permalink
[reporting] Add flag to filter the data with a the latest version sou…
Browse files Browse the repository at this point in the history
…rce (#1045)

* Add flag to filter the data with a source in its latest version.

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* Add method to get data sources in a time window and a function to select the last version among all the sources with the same (name, type, model)

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* add event_resolution

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* add two extra test cases

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* fix time filter

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* add changelog entry

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* Import annotations

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* Fix Version

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* Convert date to pandas Timestamp

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

---------

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>
  • Loading branch information
victorgarcia98 committed May 21, 2024
1 parent 14da4cb commit 70f8041
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 1 deletion.
9 changes: 9 additions & 0 deletions documentation/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
FlexMeasures Changelog
**********************

v0.22.0 | June XX, 2024
============================

New features
-------------

* `PandasReporter` accepts the parameter `use_latest_version_only` to filter input data [see `PR #1045 <https://github.com/FlexMeasures/flexmeasures/pull/1045/>`_]


v0.21.0 | April 16, 2024
============================

Expand Down
39 changes: 39 additions & 0 deletions flexmeasures/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,45 @@ def soc_sensors(db, add_battery_assets, setup_sources) -> tuple:
yield soc_maxima, soc_minima, soc_targets, values


@pytest.fixture(scope="module")
def setup_multiple_sources(db, add_battery_assets):
battery = add_battery_assets["Test battery with dynamic power capacity"]

test_sensor = Sensor(
name="test sensor",
generic_asset=battery,
unit="kW",
event_resolution=timedelta(minutes=15),
)

s1 = DataSource(name="S1", type="type 1")
s2 = DataSource(name="S2", type="type 2")
s3 = DataSource(name="S3", type="type 3")

db.session.add_all([s1, s2, s3, test_sensor])

for s in [s1, s2]:
add_beliefs(
db=db,
sensor=test_sensor,
time_slots=[pd.Timestamp("2024-01-01T10:00:00+01:00")],
values=[1],
source=s,
)

add_beliefs(
db=db,
sensor=test_sensor,
time_slots=[pd.Timestamp("2024-01-02T10:00:00+01:00")],
values=[1],
source=s3,
)

db.session.commit()

return test_sensor, s1, s2, s3


def add_beliefs(
db,
sensor: Sensor,
Expand Down
26 changes: 26 additions & 0 deletions flexmeasures/data/models/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import timely_beliefs as tb

from packaging.version import Version

from flexmeasures.data import db
from flask import current_app
import hashlib
Expand Down Expand Up @@ -363,3 +365,27 @@ def has_attribute(self, attribute: str) -> bool:

def set_attribute(self, attribute: str, value):
self.attributes[attribute] = value


def keep_latest_version(data_sources: list[DataSource]) -> list[DataSource]:
"""
Filters the given list of data sources to only include the latest version
of each unique combination of (name, type, and model).
"""
sources = dict()

for source in data_sources:
key = (source.name, source.type, source.model)
if key not in sources:
sources[key] = source
else:
sources[key] = max(
[source, sources[key]],
key=lambda x: Version(x.version if x.version else "0.0.0"),
)

last_version_sources = []
for source in sources.values():
last_version_sources.append(source)

return last_version_sources
23 changes: 22 additions & 1 deletion flexmeasures/data/models/reporting/pandas_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
PandasReporterParametersSchema,
)
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.data_sources import keep_latest_version
from flexmeasures.utils.time_utils import server_now


Expand All @@ -38,6 +39,7 @@ def fetch_data(
input: dict,
resolution: timedelta | None = None,
belief_time: datetime | None = None,
use_latest_version_only: bool = False,
):
"""
Fetches the time_beliefs from the database
Expand All @@ -58,12 +60,28 @@ def fetch_data(
event_ends_before = _input_search_parameters.pop("event_ends_before", end)
resolution = _input_search_parameters.pop("resolution", resolution)
belief_time = _input_search_parameters.pop("belief_time", belief_time)
source = _input_search_parameters.pop(
"source", _input_search_parameters.pop("sources", None)
)

if use_latest_version_only and source is None:
source = sensor.search_data_sources(
event_starts_after=start,
event_ends_before=end,
source_types=_input_search_parameters.get("source_types"),
exclude_source_types=_input_search_parameters.get(
"exclude_source_types"
),
)
if len(source) > 0:
source = keep_latest_version(source)

bdf = sensor.search_beliefs(
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
resolution=resolution,
beliefs_before=belief_time,
source=source,
**_input_search_parameters,
)

Expand All @@ -89,13 +107,16 @@ def _compute_report(self, **kwargs) -> list[dict[str, Any]]:
belief_time: datetime | None = kwargs.get("belief_time", None)
belief_horizon: timedelta | None = kwargs.get("belief_horizon", None)
output: list[dict[str, Any]] = kwargs.get("output")
use_latest_version_only: bool = kwargs.get("use_latest_version_only", False)

# by default, use the minimum resolution among the input sensors
if resolution is None:
resolution = min([i["sensor"].event_resolution for i in input])

# fetch sensor data
self.fetch_data(start, end, input, resolution, belief_time)
self.fetch_data(
start, end, input, resolution, belief_time, use_latest_version_only
)

if belief_time is None:
belief_time = server_now()
Expand Down
27 changes: 27 additions & 0 deletions flexmeasures/data/models/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,33 @@ def make_hashable(self) -> tuple:

return (self.id, self.attributes, self.generic_asset.attributes)

def search_data_sources(
self,
event_starts_after: datetime_type | None = None,
event_ends_before: datetime_type | None = None,
source_types: list[str] | None = None,
exclude_source_types: list[str] | None = None,
) -> list[DataSource]:

q = select(DataSource).join(TimedBelief).filter(TimedBelief.sensor == self)

if event_starts_after:
q = q.filter(TimedBelief.event_start >= event_starts_after)

if event_ends_before:
q = q.filter(
TimedBelief.event_start
<= pd.Timestamp(event_ends_before) - self.event_resolution
)

if source_types:
q = q.filter(DataSource.type.in_(source_types))

if exclude_source_types:
q = q.filter(DataSource.type.not_in(exclude_source_types))

return db.session.scalars(q).all()


class TimedBelief(db.Model, tb.TimedBeliefDBMixin):
"""A timed belief holds a precisely timed record of a belief about an event.
Expand Down
1 change: 1 addition & 0 deletions flexmeasures/data/schemas/reporting/pandas_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class PandasReporterParametersSchema(ReporterParametersSchema):
# for the single sensors in `input_variables`
start = AwareDateTimeField(required=False)
end = AwareDateTimeField(required=False)
use_latest_version_only = fields.Bool(required=False, default=False)

@validates_schema
def validate_time_parameters(self, data, **kwargs):
Expand Down
21 changes: 21 additions & 0 deletions flexmeasures/data/tests/test_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from flexmeasures.data.models.reporting import Reporter

from flexmeasures.data.models.data_sources import keep_latest_version, DataSource

from datetime import datetime
from pytz import UTC

Expand Down Expand Up @@ -143,3 +145,22 @@ def test_data_generator_save_parameters(
# from the method call (e.g. field `b``)
assert dg2._parameters["b"] == parameters_2["b"]
assert dg2._parameters["start"].isoformat() == parameters_2["start"]


def test_keep_last_version():
s1 = DataSource(name="s1", model="model 1", type="forecaster", version="0.1.0")
s2 = DataSource(name="s1", model="model 1", type="forecaster")
s3 = DataSource(name="s1", model="model 2", type="forecaster")
s4 = DataSource(name="s1", model="model 2", type="scheduler")

# the data source with no version is assumed to have version 0.0.0
assert keep_latest_version([s1, s2]) == [s1]

# sources with different models are preserved
assert keep_latest_version([s1, s2, s3]) == [s1, s3]

# two sources with the same model but different types
assert keep_latest_version([s3, s4]) == [s3, s4]

# repeated source
assert keep_latest_version([s1, s1]) == [s1]
29 changes: 29 additions & 0 deletions flexmeasures/data/tests/test_queries.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
from datetime import datetime, timedelta

import numpy as np
Expand Down Expand Up @@ -276,3 +277,31 @@ def test_persist_beliefs(setup_beliefs, setup_test_data, db):
sensor, source=source, most_recent_beliefs_only=False
)
assert len(bdf) == setup_beliefs * 2


def test_search_sources(db, setup_multiple_sources):
test_sensor, s1, s2, s3 = setup_multiple_sources

def get_sources_names(vec: list[DataSource]) -> list[str]:
return [s.name for s in vec]

# no filter
assert get_sources_names(test_sensor.search_data_sources()) == ["S1", "S2", "S3"]

# exclude results by type
assert get_sources_names(
test_sensor.search_data_sources(exclude_source_types=["type 1"])
) == ["S2", "S3"]

# filter by type
assert get_sources_names(
test_sensor.search_data_sources(source_types=["type 2"])
) == ["S2"]

# time window filter
assert get_sources_names(
test_sensor.search_data_sources(
event_starts_after="2024-01-01T00:00:00+01:00",
event_ends_before="2024-01-02T00:00:00+01:00",
)
) == ["S1", "S2"]

0 comments on commit 70f8041

Please sign in to comment.