-
Notifications
You must be signed in to change notification settings - Fork 34
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
14 changed files
with
1,134 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
from typing import Optional, Union, Dict | ||
|
||
import pandas as pd | ||
|
||
from flexmeasures.data.schemas.reporting import ReporterConfigSchema | ||
from flexmeasures.data.models.time_series import Sensor | ||
from flexmeasures.data.models.data_sources import DataGeneratorMixin | ||
|
||
|
||
from datetime import datetime, timedelta | ||
|
||
import timely_beliefs as tb | ||
|
||
|
||
class Reporter(DataGeneratorMixin): | ||
"""Superclass for all FlexMeasures Reporters.""" | ||
|
||
__version__ = None | ||
__author__ = None | ||
__data_generator_base__ = "Reporter" | ||
|
||
sensor: Sensor = None | ||
|
||
reporter_config: Optional[dict] = None | ||
reporter_config_raw: Optional[dict] = None | ||
schema = ReporterConfigSchema | ||
data: Dict[str, Union[tb.BeliefsDataFrame, pd.DataFrame]] = None | ||
|
||
def __init__( | ||
self, sensor: Sensor, reporter_config_raw: Optional[dict] = None | ||
) -> None: | ||
""" | ||
Initialize a new Reporter. | ||
Attributes: | ||
:param sensor: sensor where the output of the reporter will be saved to. | ||
:param reporter_config_raw: unserialized configuration of the reporter. | ||
""" | ||
|
||
self.sensor = sensor | ||
|
||
if not reporter_config_raw: | ||
reporter_config_raw = {} | ||
|
||
self.reporter_config_raw = reporter_config_raw | ||
|
||
def fetch_data( | ||
self, | ||
start: datetime, | ||
end: datetime, | ||
input_resolution: timedelta = None, | ||
belief_time: datetime = None, | ||
): | ||
""" | ||
Fetches the time_beliefs from the database | ||
""" | ||
|
||
self.data = {} | ||
for tb_query in self.tb_query_config: | ||
_tb_query = tb_query.copy() | ||
# using start / end instead of event_starts_after/event_ends_before when not defined | ||
event_starts_after = _tb_query.pop("event_starts_after", start) | ||
event_ends_before = _tb_query.pop("event_ends_before", end) | ||
resolution = _tb_query.pop("resolution", input_resolution) | ||
belief_time = _tb_query.pop("belief_time", belief_time) | ||
|
||
sensor: Sensor = _tb_query.pop("sensor", None) | ||
alias: str = _tb_query.pop("alias", None) | ||
|
||
bdf = sensor.search_beliefs( | ||
event_starts_after=event_starts_after, | ||
event_ends_before=event_ends_before, | ||
resolution=resolution, | ||
beliefs_before=belief_time, | ||
**_tb_query, | ||
) | ||
|
||
# store data source as local variable | ||
for source in bdf.sources.unique(): | ||
self.data[f"source_{source.id}"] = source | ||
|
||
# store BeliefsDataFrame as local variable | ||
if alias: | ||
self.data[alias] = bdf | ||
else: | ||
self.data[f"sensor_{sensor.id}"] = bdf | ||
|
||
def update_attribute(self, attribute, default): | ||
if default is not None: | ||
setattr(self, attribute, default) | ||
|
||
def compute( | ||
self, | ||
start: datetime, | ||
end: datetime, | ||
input_resolution: timedelta = None, | ||
belief_time: datetime = None, | ||
**kwargs, | ||
) -> tb.BeliefsDataFrame: | ||
"""This method triggers the creation of a new report. | ||
The same object can generate multiple reports with different start, end, input_resolution | ||
and belief_time values. | ||
In the future, this function will parse arbitrary input arguments defined in a schema. | ||
""" | ||
|
||
# deserialize configuration | ||
if self.reporter_config is None: | ||
self.deserialize_config() | ||
|
||
# fetch data | ||
self.fetch_data(start, end, input_resolution, belief_time) | ||
|
||
# Result | ||
result = self._compute(start, end, input_resolution, belief_time) | ||
|
||
# checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor | ||
assert self.sensor.event_resolution == result.event_resolution | ||
|
||
# Assign sensor to BeliefDataFrame | ||
result.sensor = self.sensor | ||
|
||
return result | ||
|
||
def _compute( | ||
self, | ||
start: datetime, | ||
end: datetime, | ||
input_resolution: timedelta = None, | ||
belief_time: datetime = None, | ||
) -> tb.BeliefsDataFrame: | ||
""" | ||
Overwrite with the actual computation of your report. | ||
:returns BeliefsDataFrame: report as a BeliefsDataFrame. | ||
""" | ||
raise NotImplementedError() | ||
|
||
def deserialize_config(self): | ||
""" | ||
Validate the report config against a Marshmallow Schema. | ||
Ideas: | ||
- Override this method | ||
- Call superclass method to apply validation and common variables deserialization (see PandasReporter) | ||
- (Partially) extract the relevant reporter_config parameters into class attributes. | ||
Raises ValidationErrors or ValueErrors. | ||
""" | ||
|
||
self.reporter_config = self.schema.load( | ||
self.reporter_config_raw | ||
) # validate reporter config | ||
self.tb_query_config = self.reporter_config.get( | ||
"tb_query_config" | ||
) # extracting TimeBelief query configuration parameters |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Any | ||
from datetime import datetime, timedelta | ||
|
||
from flask import current_app | ||
import timely_beliefs as tb | ||
|
||
from flexmeasures.data.models.reporting import Reporter | ||
from flexmeasures.data.schemas.reporting.pandas_reporter import ( | ||
PandasReporterConfigSchema, | ||
) | ||
|
||
|
||
class PandasReporter(Reporter): | ||
"""This reporter applies a series of pandas methods on""" | ||
|
||
__version__ = "1" | ||
__author__ = None | ||
schema = PandasReporterConfigSchema() | ||
transformations: list[dict[str, Any]] = None | ||
final_df_output: str = None | ||
|
||
def deserialize_config(self): | ||
# call super class deserialize_config | ||
super().deserialize_config() | ||
|
||
# extract PandasReporter specific fields | ||
self.transformations = self.reporter_config.get("transformations") | ||
self.final_df_output = self.reporter_config.get("final_df_output") | ||
|
||
def _compute( | ||
self, | ||
start: datetime, | ||
end: datetime, | ||
input_resolution: timedelta = None, | ||
belief_time: datetime = None, | ||
) -> tb.BeliefsDataFrame: | ||
""" | ||
This method applies the transformations and outputs the dataframe | ||
defined in `final_df_output` field of the report_config. | ||
""" | ||
|
||
# apply pandas transformations to the dataframes in `self.data` | ||
self._apply_transformations() | ||
|
||
final_output = self.data[self.final_df_output] | ||
|
||
return final_output | ||
|
||
def get_object_or_literal(self, value: Any, method: str) -> Any: | ||
"""This method allows using the dataframes as inputs of the Pandas methods that | ||
are run in the transformations. Make sure that they have been created before accessed. | ||
This works by putting the symbol `@` in front of the name of the dataframe that we want to reference. | ||
For instance, to reference the dataframe test_df, which lives in self.data, we would do `@test_df`. | ||
This functionality is disabled for methods `eval`and `query` to avoid interfering their internal behaviour | ||
given that they also use `@` to allow using local variables. | ||
Example: | ||
>>> self.get_object_or_literal(["@df_wind", "@df_solar"], "sum") | ||
[<BeliefsDataFrame for Wind Turbine sensor>, <BeliefsDataFrame for Solar Panel sensor>] | ||
""" | ||
|
||
if method in ["eval", "query"]: | ||
if isinstance(value, str) and value.startswith("@"): | ||
current_app.logger.debug( | ||
"Cannot reference objects in self.data using the method eval or query. That is because these methods use the symbol `@` to make reference to local variables." | ||
) | ||
return value | ||
|
||
if isinstance(value, str) and value.startswith("@"): | ||
value = value.replace("@", "") | ||
return self.data[value] | ||
|
||
if isinstance(value, list): | ||
return [self.get_object_or_literal(v, method) for v in value] | ||
|
||
return value | ||
|
||
def _process_pandas_args(self, args: list, method: str) -> list: | ||
"""This method applies the function get_object_or_literal to all the arguments | ||
to detect where to replace a string "@<object-name>" with the actual object stored in `self.data["<object-name>"]`. | ||
""" | ||
for i in range(len(args)): | ||
args[i] = self.get_object_or_literal(args[i], method) | ||
return args | ||
|
||
def _process_pandas_kwargs(self, kwargs: dict, method: str) -> dict: | ||
"""This method applies the function get_object_or_literal to all the keyword arguments | ||
to detect where to replace a string "@<object-name>" with the actual object stored in `self.data["<object-name>"]`. | ||
""" | ||
for k, v in kwargs.items(): | ||
kwargs[k] = self.get_object_or_literal(v, method) | ||
return kwargs | ||
|
||
def _apply_transformations(self): | ||
"""Convert the series using the given list of transformation specs, which is called in the order given. | ||
Each transformation specs should include a 'method' key specifying a method name of a Pandas DataFrame. | ||
Optionally, 'args' and 'kwargs' keys can be specified to pass on arguments or keyword arguments to the given method. | ||
All data exchange is made through the dictionary `self.data`. The superclass Reporter already fetches BeliefsDataFrames of | ||
the sensors and saves them in the self.data dictionary fields `sensor_<sensor_id>`. In case you need to perform complex operations on dataframes, you can | ||
split the operations in several steps and saving the intermediate results using the parameters `df_input` and `df_output` for the | ||
input and output dataframes, respectively. | ||
Example: | ||
The example below converts from hourly meter readings in kWh to electricity demand in kW. | ||
transformations = [ | ||
{"method": "diff"}, | ||
{"method": "shift", "kwargs": {"periods": -1}}, | ||
{"method": "head", "args": [-1]}, | ||
], | ||
""" | ||
|
||
previous_df = None | ||
|
||
for transformation in self.transformations: | ||
df_input = transformation.get( | ||
"df_input", previous_df | ||
) # default is using the previous transformation output | ||
df_output = transformation.get( | ||
"df_output", df_input | ||
) # default is OUTPUT = INPUT.method() | ||
|
||
method = transformation.get("method") | ||
args = self._process_pandas_args(transformation.get("args", []), method) | ||
kwargs = self._process_pandas_kwargs( | ||
transformation.get("kwargs", {}), method | ||
) | ||
|
||
self.data[df_output] = getattr(self.data[df_input], method)(*args, **kwargs) | ||
|
||
previous_df = df_output |
Empty file.
Oops, something went wrong.