Skip to content

Commit

Permalink
Merge dd51388 into e4c77cc
Browse files Browse the repository at this point in the history
  • Loading branch information
Flix6x committed Jun 4, 2023
2 parents e4c77cc + dd51388 commit ca153c4
Show file tree
Hide file tree
Showing 17 changed files with 102 additions and 2,056 deletions.
7 changes: 1 addition & 6 deletions documentation/api/notation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -380,22 +380,17 @@ For example, to obtain data originating from data source 42, include the followi
Data source IDs can be found by hovering over data in charts.

.. note:: Older API version (< 3) accepted user IDs (integers), account roles (strings) and lists thereof, instead of data source IDs (integers).


.. _units:

Units
^^^^^

From API version 3 onwards, we are much more flexible with sent units.
The FlexMeasures API is quite flexible with sent units.
A valid unit for timeseries data is any unit that is convertible to the configured sensor unit registered in FlexMeasures.
So, for example, you can send timeseries data with "W" unit to a "kW" sensor.
And if you wish to do so, you can even send a timeseries with "kWh" unit to a "kW" sensor.
In this case, FlexMeasures will convert the data using the resolution of the timeseries.

For API versions 1 and 2, the unit sent needs to be an exact match with the sensor unit, and only "MW" is allowed for power sensors.

.. _signs:

Signs of power values
Expand Down
131 changes: 3 additions & 128 deletions flexmeasures/api/common/utils/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from timely_beliefs.beliefs.classes import BeliefsDataFrame
from typing import List, Sequence, Tuple, Union
import copy
from datetime import datetime, timedelta
from json import loads as parse_json, JSONDecodeError

Expand All @@ -15,13 +14,10 @@
import timely_beliefs as tb

from flexmeasures.data import db
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
from flexmeasures.data.models.markets import Price
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.models.weather import WeatherSensor, Weather
from flexmeasures.data.services.time_series import drop_unchanged_beliefs
from flexmeasures.data.utils import save_to_session, save_to_db as modern_save_to_db
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.weather import WeatherSensor
from flexmeasures.data.utils import save_to_db as modern_save_to_db
from flexmeasures.api.common.responses import (
invalid_replacement,
unrecognized_sensor,
Expand Down Expand Up @@ -244,52 +240,6 @@ def unique_ever_seen(iterable: Sequence, selector: Sequence):
return u, s


def message_replace_name_with_ea(message_with_connections_as_asset_names: dict) -> dict:
"""
For each connection in the message specified by a name, replace that name with the correct entity address.
TODO: Deprecated. This function is now only used in tests of deprecated API versions and should go (also asset_replace_name_with_id)
"""
message_with_connections_as_eas = copy.deepcopy(
message_with_connections_as_asset_names
)
if "connection" in message_with_connections_as_asset_names:
message_with_connections_as_eas["connection"] = asset_replace_name_with_id(
parse_as_list( # type:ignore
message_with_connections_as_eas["connection"], of_type=str
)
)
elif "connections" in message_with_connections_as_asset_names:
message_with_connections_as_eas["connections"] = asset_replace_name_with_id(
parse_as_list( # type:ignore
message_with_connections_as_eas["connections"], of_type=str
)
)
elif "groups" in message_with_connections_as_asset_names:
for i, group in enumerate(message_with_connections_as_asset_names["groups"]):
if "connection" in group:
message_with_connections_as_eas["groups"][i][
"connection"
] = asset_replace_name_with_id(
parse_as_list(group["connection"], of_type=str) # type:ignore
)
elif "connections" in group:
message_with_connections_as_eas["groups"][i][
"connections"
] = asset_replace_name_with_id(
parse_as_list(group["connections"], of_type=str) # type:ignore
)
return message_with_connections_as_eas


def asset_replace_name_with_id(connections_as_name: List[str]) -> List[str]:
"""Look up the owner and id given the asset name and construct a type 1 USEF entity address."""
connections_as_ea = []
for asset_name in connections_as_name:
asset = Asset.query.filter(Asset.name == asset_name).one_or_none()
connections_as_ea.append(asset.entity_address)
return connections_as_ea


def get_sensor_by_generic_asset_type_and_location(
generic_asset_type_name: str, latitude: float = 0, longitude: float = 0
) -> Union[Sensor, ResponseTuple]:
Expand Down Expand Up @@ -382,81 +332,6 @@ def save_and_enqueue(
return invalid_replacement()


def save_to_db(
timed_values: Union[BeliefsDataFrame, List[Union[Power, Price, Weather]]],
forecasting_jobs: List[Job] = [],
save_changed_beliefs_only: bool = True,
) -> ResponseTuple:
"""Put the timed values into the database and enqueue forecasting jobs.
Data can only be replaced on servers in play mode.
TODO: remove this legacy function in its entirety (announced v0.8.0)
:param timed_values: BeliefsDataFrame or a list of Power, Price or Weather values to be saved
:param forecasting_jobs: list of forecasting Jobs for redis queues.
:param save_changed_beliefs_only: if True, beliefs that are already stored in the database with an earlier belief time are dropped.
:returns: ResponseTuple
"""

import warnings

warnings.warn(
"The method api.common.utils.api_utils.save_to_db is deprecated. Check out the following replacements:"
"- [recommended option] to store BeliefsDataFrames only, switch to data.utils.save_to_db"
"- to store BeliefsDataFrames and enqueue jobs, switch to api.common.utils.api_utils.save_and_enqueue"
)

if isinstance(timed_values, BeliefsDataFrame):

if save_changed_beliefs_only:
# Drop beliefs that haven't changed
timed_values = drop_unchanged_beliefs(timed_values)

# Work around bug in which groupby still introduces an index level, even though we asked it not to
if None in timed_values.index.names:
timed_values.index = timed_values.index.droplevel(None)

if timed_values.empty:
current_app.logger.debug("Nothing new to save")
return already_received_and_successfully_processed()

current_app.logger.info("SAVING TO DB AND QUEUEING...")
try:
if isinstance(timed_values, BeliefsDataFrame):
TimedBelief.add_to_session(
session=db.session, beliefs_data_frame=timed_values
)
else:
save_to_session(timed_values)
db.session.flush()
[current_app.queues["forecasting"].enqueue_job(job) for job in forecasting_jobs]
db.session.commit()
return request_processed()
except IntegrityError as e:
current_app.logger.warning(e)
db.session.rollback()

# Possibly allow data to be replaced depending on config setting
if current_app.config.get("FLEXMEASURES_ALLOW_DATA_OVERWRITE", False):
if isinstance(timed_values, BeliefsDataFrame):
TimedBelief.add_to_session(
session=db.session,
beliefs_data_frame=timed_values,
allow_overwrite=True,
)
else:
save_to_session(timed_values, overwrite=True)
[
current_app.queues["forecasting"].enqueue_job(job)
for job in forecasting_jobs
]
db.session.commit()
return request_processed()
else:
return already_received_and_successfully_processed()


def determine_belief_timing(
event_values: list,
start: datetime,
Expand Down
65 changes: 49 additions & 16 deletions flexmeasures/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from flask import request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_security import roles_accepted
from timely_beliefs.sensors.func_store.knowledge_horizons import x_days_ago_at_y_oclock

from werkzeug.exceptions import (
InternalServerError,
BadRequest,
Expand All @@ -28,7 +30,6 @@
from flexmeasures.data.models.generic_assets import GenericAssetType, GenericAsset
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.planning.utils import initialize_index
from flexmeasures.data.models.markets import Market, MarketType
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.models.user import User, Account, AccountRole

Expand Down Expand Up @@ -231,32 +232,39 @@ def create_roles_users(db, test_accounts) -> dict[str, User]:


@pytest.fixture(scope="module")
def setup_markets(db) -> dict[str, Market]:
def setup_markets(db) -> dict[str, Sensor]:
return create_test_markets(db)


@pytest.fixture(scope="function")
def setup_markets_fresh_db(fresh_db) -> dict[str, Market]:
def setup_markets_fresh_db(fresh_db) -> dict[str, Sensor]:
return create_test_markets(fresh_db)


def create_test_markets(db) -> dict[str, Market]:
def create_test_markets(db) -> dict[str, Sensor]:
"""Create the epex_da market."""

day_ahead = MarketType(
day_ahead = GenericAssetType(
name="day_ahead",
daily_seasonality=True,
weekly_seasonality=True,
yearly_seasonality=True,
)
db.session.add(day_ahead)
epex_da = Market(
epex = GenericAsset(
name="epex",
generic_asset_type=day_ahead,
)
epex_da = Sensor(
name="epex_da",
market_type_name="day_ahead",
generic_asset=epex,
event_resolution=timedelta(hours=1),
unit="EUR/MWh",
knowledge_horizon_fnc="x_days_ago_at_y_oclock",
knowledge_horizon_par={"x": 1, "y": 12, "z": "Europe/Paris"},
knowledge_horizon=(
x_days_ago_at_y_oclock,
{"x": 1, "y": 12, "z": "Europe/Paris"},
),
attributes=dict(
daily_seasonality=True,
weekly_seasonality=True,
yearly_seasonality=True,
),
)
db.session.add(epex_da)
return {"epex_da": epex_da}
Expand Down Expand Up @@ -398,6 +406,31 @@ def create_test_asset_types(db) -> dict[str, AssetType]:
@pytest.fixture(scope="module")
def setup_assets(
db, setup_roles_users, setup_markets, setup_sources, setup_asset_types
) -> dict[str, Asset]:
return create_assets(
db, setup_roles_users, setup_markets, setup_sources, setup_asset_types
)


@pytest.fixture(scope="function")
def setup_assets_fresh_db(
fresh_db,
setup_roles_users_fresh_db,
setup_markets_fresh_db,
setup_sources_fresh_db,
setup_asset_types_fresh_db,
) -> dict[str, Asset]:
return create_assets(
fresh_db,
setup_roles_users_fresh_db,
setup_markets_fresh_db,
setup_sources_fresh_db,
setup_asset_types_fresh_db,
)


def create_assets(
db, setup_roles_users, setup_markets, setup_sources, setup_asset_types
) -> dict[str, Asset]:
"""Add assets to known test users.
Deprecated. Remove with Asset model."""
Expand Down Expand Up @@ -527,7 +560,7 @@ def add_market_prices(
belief_horizon=timedelta(hours=0),
event_value=val,
source=setup_sources["Seita"],
sensor=setup_markets["epex_da"].corresponding_sensor,
sensor=setup_markets["epex_da"],
)
for dt, val in zip(time_slots, values)
]
Expand All @@ -546,12 +579,12 @@ def add_market_prices(
belief_horizon=timedelta(hours=0),
event_value=val,
source=setup_sources["Seita"],
sensor=setup_markets["epex_da"].corresponding_sensor,
sensor=setup_markets["epex_da"],
)
for dt, val in zip(time_slots, values)
]
db.session.add_all(day2_beliefs)
return {"epex_da": setup_markets["epex_da"].corresponding_sensor}
return {"epex_da": setup_markets["epex_da"]}


@pytest.fixture(scope="module")
Expand Down
1 change: 0 additions & 1 deletion flexmeasures/data/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def configure_db_for(app: Flask):
# you will have to import them first before calling configure_db().
from flexmeasures.data.models import ( # noqa: F401
time_series,
markets,
assets,
weather,
data_sources,
Expand Down
Loading

0 comments on commit ca153c4

Please sign in to comment.