Skip to content

Commit

Permalink
Merge b18687d into da6fdf7
Browse files Browse the repository at this point in the history
  • Loading branch information
Flix6x committed May 22, 2023
2 parents da6fdf7 + b18687d commit 58ef917
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 817 deletions.
131 changes: 3 additions & 128 deletions flexmeasures/api/common/utils/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from timely_beliefs.beliefs.classes import BeliefsDataFrame
from typing import List, Sequence, Tuple, Union
import copy
from datetime import datetime, timedelta
from json import loads as parse_json, JSONDecodeError

Expand All @@ -15,13 +14,10 @@
import timely_beliefs as tb

from flexmeasures.data import db
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
from flexmeasures.data.models.markets import Price
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.models.weather import WeatherSensor, Weather
from flexmeasures.data.services.time_series import drop_unchanged_beliefs
from flexmeasures.data.utils import save_to_session, save_to_db as modern_save_to_db
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.weather import WeatherSensor
from flexmeasures.data.utils import save_to_db as modern_save_to_db
from flexmeasures.api.common.responses import (
invalid_replacement,
unrecognized_sensor,
Expand Down Expand Up @@ -244,52 +240,6 @@ def unique_ever_seen(iterable: Sequence, selector: Sequence):
return u, s


def message_replace_name_with_ea(message_with_connections_as_asset_names: dict) -> dict:
"""
For each connection in the message specified by a name, replace that name with the correct entity address.
TODO: Deprecated. This function is now only used in tests of deprecated API versions and should go (also asset_replace_name_with_id)
"""
message_with_connections_as_eas = copy.deepcopy(
message_with_connections_as_asset_names
)
if "connection" in message_with_connections_as_asset_names:
message_with_connections_as_eas["connection"] = asset_replace_name_with_id(
parse_as_list( # type:ignore
message_with_connections_as_eas["connection"], of_type=str
)
)
elif "connections" in message_with_connections_as_asset_names:
message_with_connections_as_eas["connections"] = asset_replace_name_with_id(
parse_as_list( # type:ignore
message_with_connections_as_eas["connections"], of_type=str
)
)
elif "groups" in message_with_connections_as_asset_names:
for i, group in enumerate(message_with_connections_as_asset_names["groups"]):
if "connection" in group:
message_with_connections_as_eas["groups"][i][
"connection"
] = asset_replace_name_with_id(
parse_as_list(group["connection"], of_type=str) # type:ignore
)
elif "connections" in group:
message_with_connections_as_eas["groups"][i][
"connections"
] = asset_replace_name_with_id(
parse_as_list(group["connections"], of_type=str) # type:ignore
)
return message_with_connections_as_eas


def asset_replace_name_with_id(connections_as_name: List[str]) -> List[str]:
"""Look up the owner and id given the asset name and construct a type 1 USEF entity address."""
connections_as_ea = []
for asset_name in connections_as_name:
asset = Asset.query.filter(Asset.name == asset_name).one_or_none()
connections_as_ea.append(asset.entity_address)
return connections_as_ea


def get_sensor_by_generic_asset_type_and_location(
generic_asset_type_name: str, latitude: float = 0, longitude: float = 0
) -> Union[Sensor, ResponseTuple]:
Expand Down Expand Up @@ -382,81 +332,6 @@ def save_and_enqueue(
return invalid_replacement()


def save_to_db(
timed_values: Union[BeliefsDataFrame, List[Union[Power, Price, Weather]]],
forecasting_jobs: List[Job] = [],
save_changed_beliefs_only: bool = True,
) -> ResponseTuple:
"""Put the timed values into the database and enqueue forecasting jobs.
Data can only be replaced on servers in play mode.
TODO: remove this legacy function in its entirety (announced v0.8.0)
:param timed_values: BeliefsDataFrame or a list of Power, Price or Weather values to be saved
:param forecasting_jobs: list of forecasting Jobs for redis queues.
:param save_changed_beliefs_only: if True, beliefs that are already stored in the database with an earlier belief time are dropped.
:returns: ResponseTuple
"""

import warnings

warnings.warn(
"The method api.common.utils.api_utils.save_to_db is deprecated. Check out the following replacements:"
"- [recommended option] to store BeliefsDataFrames only, switch to data.utils.save_to_db"
"- to store BeliefsDataFrames and enqueue jobs, switch to api.common.utils.api_utils.save_and_enqueue"
)

if isinstance(timed_values, BeliefsDataFrame):

if save_changed_beliefs_only:
# Drop beliefs that haven't changed
timed_values = drop_unchanged_beliefs(timed_values)

# Work around bug in which groupby still introduces an index level, even though we asked it not to
if None in timed_values.index.names:
timed_values.index = timed_values.index.droplevel(None)

if timed_values.empty:
current_app.logger.debug("Nothing new to save")
return already_received_and_successfully_processed()

current_app.logger.info("SAVING TO DB AND QUEUEING...")
try:
if isinstance(timed_values, BeliefsDataFrame):
TimedBelief.add_to_session(
session=db.session, beliefs_data_frame=timed_values
)
else:
save_to_session(timed_values)
db.session.flush()
[current_app.queues["forecasting"].enqueue_job(job) for job in forecasting_jobs]
db.session.commit()
return request_processed()
except IntegrityError as e:
current_app.logger.warning(e)
db.session.rollback()

# Possibly allow data to be replaced depending on config setting
if current_app.config.get("FLEXMEASURES_ALLOW_DATA_OVERWRITE", False):
if isinstance(timed_values, BeliefsDataFrame):
TimedBelief.add_to_session(
session=db.session,
beliefs_data_frame=timed_values,
allow_overwrite=True,
)
else:
save_to_session(timed_values, overwrite=True)
[
current_app.queues["forecasting"].enqueue_job(job)
for job in forecasting_jobs
]
db.session.commit()
return request_processed()
else:
return already_received_and_successfully_processed()


def determine_belief_timing(
event_values: list,
start: datetime,
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/models/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def __init__(self, **kwargs):
"assets", lazy=True, cascade="all, delete-orphan", passive_deletes=True
),
)
market = db.relationship("Market", backref=db.backref("assets", lazy=True))
# market = db.relationship("Market", backref=db.backref("assets", lazy=True))

def latest_state(self, event_ends_before: Optional[datetime] = None) -> "Power":
"""Search the most recent event for this sensor, optionally before some datetime."""
Expand Down

0 comments on commit 58ef917

Please sign in to comment.