Skip to content

Commit

Permalink
Add method to get data sources in a time window and a function to sel…
Browse files Browse the repository at this point in the history
…ect the last version among all the sources with the same (name, type, model)

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>
  • Loading branch information
victorgarcia98 committed Apr 30, 2024
1 parent 7ee030d commit 75ccefd
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 11 deletions.
39 changes: 39 additions & 0 deletions flexmeasures/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,45 @@ def capacity_sensors(db, add_battery_assets, setup_sources):
)


@pytest.fixture(scope="module")
def setup_multiple_sources(db, add_battery_assets):
battery = add_battery_assets["Test battery with dynamic power capacity"]

test_sensor = Sensor(
name="test sensor",
generic_asset=battery,
unit="kW",
event_resolution=timedelta(minutes=15),
)

s1 = DataSource(name="S1", type="type 1")
s2 = DataSource(name="S2", type="type 2")
s3 = DataSource(name="S3", type="type 3")

db.session.add_all([s1, s2, s3, test_sensor])

for s in [s1, s2]:
add_beliefs(
db=db,
sensor=test_sensor,
time_slots=[pd.Timestamp("2024-01-01T10:00:00+01:00")],
values=[1],
source=s,
)

add_beliefs(
db=db,
sensor=test_sensor,
time_slots=[pd.Timestamp("2024-01-02T10:00:00+01:00")],
values=[1],
source=s3,
)

db.session.commit()

return test_sensor, s1, s2, s3


def add_beliefs(
db,
sensor: Sensor,
Expand Down
26 changes: 26 additions & 0 deletions flexmeasures/data/models/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import timely_beliefs as tb

from packaging.version import Version

from flexmeasures.data import db
from flask import current_app
import hashlib
Expand Down Expand Up @@ -363,3 +365,27 @@ def has_attribute(self, attribute: str) -> bool:

def set_attribute(self, attribute: str, value):
self.attributes[attribute] = value


def keep_latest_version(data_sources: list[DataSource]) -> list[DataSource]:
"""
Filters the given list of data sources to only include the latest version
of each unique combination of (name, type, and model).
"""
sources = dict()

for source in data_sources:
key = (source.name, source.type, source.model)
if key not in sources:
sources[key] = source
else:
sources[key] = max(
[source, sources[key]],
key=lambda x: Version(x.version if x.version else "0.0.0"),
)

last_version_sources = []
for source in sources.values():
last_version_sources.append(source)

return last_version_sources
26 changes: 16 additions & 10 deletions flexmeasures/data/models/reporting/pandas_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Any
from datetime import datetime, timedelta
from copy import deepcopy, copy
from packaging.version import Version

from flask import current_app
import timely_beliefs as tb
Expand All @@ -14,6 +13,7 @@
PandasReporterParametersSchema,
)
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.data_sources import keep_latest_version
from flexmeasures.utils.time_utils import server_now


Expand Down Expand Up @@ -60,22 +60,28 @@ def fetch_data(
event_ends_before = _input_search_parameters.pop("event_ends_before", end)
resolution = _input_search_parameters.pop("resolution", resolution)
belief_time = _input_search_parameters.pop("belief_time", belief_time)
sources = None
source = _input_search_parameters.pop(
"source", _input_search_parameters.pop("sources", None)
)

if use_latest_version_only:
sources = sensor.data_sources
if len(sources) > 0:
sources = max(
sources,
key=lambda x: Version(x.version if x.version else "0.0.0"),
)
if use_latest_version_only and source is None:
source = sensor.search_data_sources(
event_starts_after=start,
event_ends_before=end,
source_types=_input_search_parameters.get("source_types"),
exclude_source_types=_input_search_parameters.get(
"exclude_source_types"
),
)
if len(source) > 0:
source = keep_latest_version(source)

bdf = sensor.search_beliefs(
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
resolution=resolution,
beliefs_before=belief_time,
source=sources,
source=source,
**_input_search_parameters,
)

Expand Down
24 changes: 24 additions & 0 deletions flexmeasures/data/models/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,30 @@ def make_hashable(self) -> tuple:

return (self.id, self.attributes, self.generic_asset.attributes)

def search_data_sources(
self,
event_starts_after: datetime_type | None = None,
event_ends_before: datetime_type | None = None,
source_types: list[str] | None = None,
exclude_source_types: list[str] | None = None,
) -> list[DataSource]:

q = select(DataSource).join(TimedBelief).filter(TimedBelief.sensor == self)

if event_starts_after:
q = q.filter(TimedBelief.event_start >= event_starts_after)

if event_ends_before:
q = q.filter(TimedBelief.event_start <= event_ends_before)

if source_types:
q = q.filter(DataSource.type.in_(source_types))

if exclude_source_types:
q = q.filter(DataSource.type.not_in(exclude_source_types))

return db.session.scalars(q).all()


class TimedBelief(db.Model, tb.TimedBeliefDBMixin):
"""A timed belief holds a precisely timed record of a belief about an event.
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/schemas/reporting/pandas_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class PandasReporterParametersSchema(ReporterParametersSchema):
# for the single sensors in `input_variables`
start = AwareDateTimeField(required=False)
end = AwareDateTimeField(required=False)
use_latest_version_only = fields.Bool(required=False, defaul=False)
use_latest_version_only = fields.Bool(required=False, default=False)

@validates_schema
def validate_time_parameters(self, data, **kwargs):
Expand Down
15 changes: 15 additions & 0 deletions flexmeasures/data/tests/test_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from flexmeasures.data.models.reporting import Reporter

from flexmeasures.data.models.data_sources import keep_latest_version, DataSource

from datetime import datetime
from pytz import UTC

Expand Down Expand Up @@ -143,3 +145,16 @@ def test_data_generator_save_parameters(
# from the method call (e.g. field `b``)
assert dg2._parameters["b"] == parameters_2["b"]
assert dg2._parameters["start"].isoformat() == parameters_2["start"]


def test_keep_last_version():
s1 = DataSource(name="s1", model="model 1", type="forecaster", version="0.1.0")
s2 = DataSource(name="s1", model="model 1", type="forecaster")

s3 = DataSource(name="s1", model="model 2", type="forecaster")

# the data source with no version is assumed to have version 0.0.0
assert keep_latest_version([s1, s2]) == [s1]

# sources with different models are preserved
assert keep_latest_version([s1, s2, s3]) == [s1, s3]
28 changes: 28 additions & 0 deletions flexmeasures/data/tests/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,31 @@ def test_persist_beliefs(setup_beliefs, setup_test_data, db):
sensor, source=source, most_recent_beliefs_only=False
)
assert len(bdf) == setup_beliefs * 2


def test_search_sources(db, setup_multiple_sources):
test_sensor, s1, s2, s3 = setup_multiple_sources

def get_sources_names(vec: list[DataSource]) -> list[str]:
return [s.name for s in vec]

# no filter
assert get_sources_names(test_sensor.search_data_sources()) == ["S1", "S2", "S3"]

# exclude results by type
assert get_sources_names(
test_sensor.search_data_sources(exclude_source_types=["type 1"])
) == ["S2", "S3"]

# filter by type
assert get_sources_names(
test_sensor.search_data_sources(source_types=["type 2"])
) == ["S2"]

# time window filter
assert get_sources_names(
test_sensor.search_data_sources(
event_starts_after="2024-01-01T00:00:00+01:00",
event_ends_before="2024-01-02T00:00:00+01:00",
)
) == ["S1", "S2"]

0 comments on commit 75ccefd

Please sign in to comment.