Skip to content

Commit

Permalink
integrate id and class into a dict
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Garcia Reolid <victor@seita.nl>
  • Loading branch information
victorgarcia98 committed Oct 30, 2023
1 parent 568a83b commit 9718261
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 49 deletions.
6 changes: 3 additions & 3 deletions flexmeasures/api/v3_0/tests/test_sensor_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def test_trigger_and_get_schedule_with_unknown_prices(
len(app.queues["scheduling"]) == 1
) # only 1 schedule should be made for 1 asset
job = app.queues["scheduling"].jobs[0]
assert job.kwargs["asset_or_sensor_id"] == sensor.id
assert job.kwargs["asset_or_sensor"]["id"] == sensor.id
assert job.kwargs["start"] == parse_datetime(message["start"])
assert job.id == job_id

Expand Down Expand Up @@ -216,7 +216,7 @@ def test_trigger_and_get_schedule(
len(app.queues["scheduling"]) == 1
) # only 1 schedule should be made for 1 asset
job = app.queues["scheduling"].jobs[0]
assert job.kwargs["asset_or_sensor_id"] == sensor.id
assert job.kwargs["asset_or_sensor"]["id"] == sensor.id
assert job.kwargs["start"] == parse_datetime(message["start"])
assert job.id == job_id

Expand Down Expand Up @@ -400,7 +400,7 @@ def test_get_schedule_fallback(
len(app.queues["scheduling"]) == 1
) # only 1 schedule should be made for 1 asset
job = app.queues["scheduling"].jobs[0]
assert job.kwargs["asset_or_sensor_id"] == charging_station.id
assert job.kwargs["asset_or_sensor"]["id"] == charging_station.id
assert job.kwargs["start"] == parse_datetime(message["start"])
assert job.id == job_id

Expand Down
15 changes: 5 additions & 10 deletions flexmeasures/cli/data_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
from flexmeasures.utils.time_utils import server_now, apply_offset_chain
from flexmeasures.utils.unit_utils import convert_units, ur
from flexmeasures.data.utils import save_to_db
from flexmeasures.data.services.utils import get_asset_or_sensor_ref
from flexmeasures.data.models.reporting import Reporter
from flexmeasures.data.models.reporting.profit import ProfitOrLossReporter
from timely_beliefs import BeliefsDataFrame
Expand Down Expand Up @@ -1205,18 +1206,15 @@ def add_schedule_for_storage(
},
)
if as_job:
job = create_scheduling_job(
asset_or_sensor_id=power_sensor, **scheduling_kwargs
)
job = create_scheduling_job(asset_or_sensor=power_sensor, **scheduling_kwargs)
if job:
click.secho(
f"New scheduling job {job.id} has been added to the queue.",
**MsgStyle.SUCCESS,
)
else:
success = make_schedule(
asset_or_sensor_id=power_sensor.id,
entity_type=Sensor.__name__,
asset_or_sensor=get_asset_or_sensor_ref(power_sensor),
**scheduling_kwargs,
)
if success:
Expand Down Expand Up @@ -1342,18 +1340,15 @@ def add_schedule_process(
}

if as_job:
job = create_scheduling_job(
asset_or_sensor_id=power_sensor, **scheduling_kwargs
)
job = create_scheduling_job(asset_or_sensor=power_sensor, **scheduling_kwargs)
if job:
click.secho(
f"New scheduling job {job.id} has been added to the queue.",
**MsgStyle.SUCCESS,
)
else:
success = make_schedule(
asset_or_sensor_id=power_sensor.id,
entity_type=Sensor.__name__,
asset_or_sensor=get_asset_or_sensor_ref(power_sensor),
**scheduling_kwargs,
)
if success:
Expand Down
9 changes: 5 additions & 4 deletions flexmeasures/data/models/planning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ class Scheduler:

# set to True if the Scheduler supports triggering on an Asset or False
# if the Scheduler expects a Sensor
requires_asset = False
supports_scheduling_an_asset = False

def __init__(
self,
asset_or_sensor: Asset | Sensor | None = None,
sensor: Optional[Sensor] = None, # deprecated
start: Optional[datetime] = None,
end: Optional[datetime] = None,
resolution: Optional[timedelta] = None,
sensor: Optional[Sensor] = None,
belief_time: Optional[datetime] = None,
asset_or_sensor: Asset | Sensor | None = None,
round_to_decimals: Optional[int] = 6,
flex_model: Optional[dict] = None,
flex_context: Optional[dict] = None,
Expand All @@ -92,7 +92,7 @@ def __init__(
)
asset_or_sensor = sensor

if self.requires_asset and isinstance(asset_or_sensor, Asset):
if self.supports_scheduling_an_asset and isinstance(asset_or_sensor, Sensor):
raise WrongEntityException(
f"The scheduler class {self.__class__.__name__} expects an Asset object but a Sensor was provided."
)
Expand All @@ -109,6 +109,7 @@ def __init__(
f"The scheduler class {self.__class__.__name__} expects an Asset or Sensor objects but an object of class `{asset_or_sensor.__class__.__name__}` was provided."
)

self.asset_or_sensor = asset_or_sensor
self.start = start
self.end = end
self.resolution = resolution
Expand Down
56 changes: 25 additions & 31 deletions flexmeasures/data/services/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.utils import get_data_source, save_to_db
from flexmeasures.utils.time_utils import server_now
from flexmeasures.data.services.utils import job_cache
from flexmeasures.data.services.utils import (
job_cache,
get_asset_or_sensor_ref,
get_asset_or_sensor_from_ref,
)


def load_custom_scheduler(scheduler_specs: dict) -> type:
Expand Down Expand Up @@ -93,29 +97,14 @@ def load_custom_scheduler(scheduler_specs: dict) -> type:
return scheduler_class


def get_asset_or_sensor(id: int, entity_type):
if entity_type == Asset.__name__:
klass = Asset
elif entity_type == Sensor.__name__:
klass = Sensor
else:
raise ValueError(
f"Unregonized entity_type `{entity_type}`. Please, consider using GenericAsset or Sensor."
)

return klass.query.get(id)


def trigger_optional_fallback(job, connection, type, value, traceback):
"""Create a fallback schedule job when the error is of type InfeasibleProblemException"""

job.meta["exception"] = value
job.save_meta()

if type is InfeasibleProblemException:
asset_or_sensor_id = job.meta.get("asset_or_sensor_id")
entity_type = job.meta.get("entity_type")
asset_or_sensor = get_asset_or_sensor(asset_or_sensor_id, entity_type)
asset_or_sensor = get_asset_or_sensor_from_ref(job.meta.get("asset_or_sensor"))

scheduler_kwargs = job.meta["scheduler_kwargs"]

Expand Down Expand Up @@ -206,8 +195,7 @@ def create_scheduling_job(
job = Job.create(
make_schedule,
kwargs=dict(
asset_or_sensor_id=asset_or_sensor.id,
entity_type=asset_or_sensor.__class__.__name__,
asset_or_sensor=get_asset_or_sensor_ref(asset_or_sensor),
scheduler_specs=scheduler_specs,
**scheduler_kwargs,
),
Expand All @@ -226,8 +214,7 @@ def create_scheduling_job(
on_failure=Callback(trigger_optional_fallback),
)

job.meta["asset_or_sensor_id"] = asset_or_sensor.id
job.meta["entity_type"] = asset_or_sensor.__class__.__name__
job.meta["asset_or_sensor"] = get_asset_or_sensor_ref(asset_or_sensor)
job.meta["scheduler_kwargs"] = scheduler_kwargs
job.save_meta()

Expand All @@ -242,10 +229,11 @@ def create_scheduling_job(


def make_schedule(
start: datetime,
end: datetime,
resolution: timedelta,
asset_or_sensor_id: int | None = None,
sensor_id: int | None = None,
start: datetime | None = None,
end: datetime | None = None,
resolution: timedelta | None = None,
asset_or_sensor: dict | None = None,
entity_type: str | None = None,
belief_time: datetime | None = None,
flex_model: dict | None = None,
Expand All @@ -267,7 +255,13 @@ def make_schedule(
# https://docs.sqlalchemy.org/en/13/faq/connections.html#how-do-i-use-engines-connections-sessions-with-python-multiprocessing-or-os-fork
db.engine.dispose()

asset_or_sensor = get_asset_or_sensor(asset_or_sensor_id, entity_type)
if sensor_id is not None:
current_app.logger.warning(
"The `sensor_id` keyword argument is deprecated. Please, consider using the argument `asset_or_sensor`."
)
asset_or_sensor = {"class": "Sensor", "id": sensor_id}

asset_or_sensor = get_asset_or_sensor_from_ref(asset_or_sensor)

rq_job = get_current_job()
if rq_job:
Expand All @@ -286,10 +280,10 @@ def make_schedule(
if belief_time is None:
belief_time = server_now()
scheduler: Scheduler = scheduler_class(
asset_or_sensor,
start,
end,
resolution,
asset_or_sensor=asset_or_sensor,
start=start,
end=end,
resolution=resolution,
belief_time=belief_time,
flex_model=flex_model,
flex_context=flex_context,
Expand Down Expand Up @@ -352,7 +346,7 @@ def make_schedule(

def find_scheduler_class(asset_or_sensor: Asset | Sensor) -> type:
"""
Find out which scheduler to use, given a asset or sensor.
Find out which scheduler to use, given an asset or sensor.
This will morph into a logic store utility, and schedulers should be registered for asset types there,
instead of this fixed lookup logic.
"""
Expand Down
40 changes: 39 additions & 1 deletion flexmeasures/data/services/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,49 @@
from flask import current_app
from rq.job import Job

from flexmeasures import Sensor
from flexmeasures import Sensor, Asset
from flexmeasures.data import db
from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType


def get_asset_or_sensor_ref(asset_or_sensor: Asset | Sensor) -> dict:
if hasattr(asset_or_sensor, "id"):
return {"id": asset_or_sensor.id, "class": asset_or_sensor.__class__.__name__}
else:
return None


def get_asset_or_sensor_from_ref(asset_or_sensor: dict):
"""
Fetch Asset or Sensor object described by the asset_or_sensor dictionary.
This dictionary needs to contain the class name and row id.
We currently cannot simplify this by just passing around the object
instead of the class name: i.e. the function arguments need to
be serializable as job parameters.
Examples:
>> get_asset_or_sensor({"class" : "Asset", "id" : 1})
Asset(id=1)
>> get_asset_or_sensor({"class" : "Sensor", "id" : 2})
Sensor(id=2)
"""
if asset_or_sensor["class"] == Asset.__name__:
klass = Asset
elif asset_or_sensor["class"] == Sensor.__name__:
klass = Sensor
else:
raise ValueError(
f"Unrecognized class `{asset_or_sensor['class']}`. Please, consider using GenericAsset or Sensor."
)

return klass.query.get(asset_or_sensor["id"])


def get_or_create_model(
model_class: Type[GenericAsset | GenericAssetType | Sensor], **kwargs
) -> GenericAsset | GenericAssetType | Sensor:
Expand Down

0 comments on commit 9718261

Please sign in to comment.