Skip to content

Commit

Permalink
Merge 06bff04 into edf38a8
Browse files Browse the repository at this point in the history
  • Loading branch information
Flix6x committed Jun 5, 2024
2 parents edf38a8 + 06bff04 commit eaf5f4d
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 11 deletions.
212 changes: 207 additions & 5 deletions flexmeasures/api/v3_0/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,43 @@
from datetime import datetime, timedelta
import json

from flask import current_app
import isodate
from flask import current_app, url_for
from flask_classful import FlaskView, route
from flask_security import auth_required
from flask_json import as_json
from marshmallow import fields, ValidationError
from rq.job import Job, NoSuchJobError
from webargs.flaskparser import use_kwargs, use_args
from sqlalchemy import select, delete

from flexmeasures.auth.decorators import permission_required_for_context
from flexmeasures.data import db
from flexmeasures.data.models.user import Account
from flexmeasures.data.models.generic_assets import GenericAsset
from flexmeasures.data.schemas.generic_assets import (
GenericAssetSchema as AssetSchema,
GenericAssetIdField as AssetIdField,
)
from flexmeasures.data.queries.utils import simplify_index
from flexmeasures.data.schemas import AssetIdField
from flexmeasures.data.schemas.generic_assets import GenericAssetSchema as AssetSchema
from flexmeasures.data.schemas.scheduling import AssetTriggerSchema
from flexmeasures.data.schemas.times import AwareDateTimeField
from flexmeasures.data.services.scheduling import (
create_sequential_scheduling_job,
get_data_source_for_job,
)
from flexmeasures.data.services.utils import get_asset_or_sensor_from_ref
from flexmeasures.api.common.schemas.users import AccountIdField
from flexmeasures.api.common.responses import (
fallback_schedule_redirect,
invalid_flex_config,
request_processed,
unknown_schedule,
unrecognized_event,
)
from flexmeasures.api.common.utils.validators import (
optional_duration_accepted,
)
from flexmeasures.utils.coding_utils import flatten_unique
from flexmeasures.utils.time_utils import duration_isoformat
from flexmeasures.ui.utils.view_utils import set_session_variables


Expand Down Expand Up @@ -516,3 +526,195 @@ def trigger_schedule(
response = dict(schedule=jobs[-1].id)
d, s = request_processed()
return dict(**response, **d), s

@route("/<id>/schedules/<uuid>", methods=["GET"])
@use_kwargs(
{
"asset": AssetIdField(data_key="id", status_if_not_found=404),
"job_id": fields.Str(data_key="uuid"),
},
location="path",
)
@optional_duration_accepted(
timedelta(hours=6)
) # todo: make this a Marshmallow field
@permission_required_for_context("read", ctx_arg_name="asset")
def get_schedule( # noqa: C901
self, asset: GenericAsset, job_id: str, duration: timedelta, **kwargs
):
"""Get a schedule from FlexMeasures for multiple devices.
.. :quickref: Schedule; Download schedule from the platform for multiple devices
**Optional fields**
- "duration" (6 hours by default; can be increased to plan further into the future)
**Example response**
This message contains a schedule indicating two devices to consume at various power
rates from 10am UTC onwards for a duration of 45 minutes.
.. sourcecode:: json
{
"schedule": [
{
"sensor": 1,
"values": [
2.15,
3,
2
],
"start": "2015-06-02T10:00:00+00:00",
"duration": "PT45M",
"unit": "MW"
},
{
"sensor": 2,
"values": [
2.15,
3,
2
],
"start": "2015-06-02T10:00:00+00:00",
"duration": "PT45M",
"unit": "MW"
}
]
}
:reqheader Authorization: The authentication token
:reqheader Content-Type: application/json
:resheader Content-Type: application/json
:status 200: PROCESSED
:status 400: INVALID_TIMEZONE, INVALID_DOMAIN, INVALID_UNIT, UNKNOWN_SCHEDULE, UNRECOGNIZED_CONNECTION_GROUP
:status 401: UNAUTHORIZED
:status 403: INVALID_SENDER
:status 405: INVALID_METHOD
:status 422: UNPROCESSABLE_ENTITY
"""

planning_horizon = min( # type: ignore
duration, current_app.config.get("FLEXMEASURES_PLANNING_HORIZON")
)

# Look up the scheduling job
connection = current_app.queues["scheduling"].connection

try: # First try the scheduling queue
job = Job.fetch(job_id, connection=connection)
except NoSuchJobError:
return unrecognized_event(job_id, "job")

scheduler_info = job.meta.get("scheduler_info", dict(scheduler=""))
scheduler_info_msg = f"{scheduler_info['scheduler']} was used."

if job.is_finished:
error_message = "A scheduling job has been processed with your job ID, but "

elif job.is_failed: # Try to inform the user on why the job failed
e = job.meta.get(
"exception",
Exception(
"The job does not state why it failed. "
"The worker may be missing an exception handler, "
"or its exception handler is not storing the exception as job meta data."
),
)
message = f"Scheduling job failed with {type(e).__name__}: {e}. {scheduler_info_msg}"

fallback_job_id = job.meta.get("fallback_job_id")

# redirect to the fallback schedule endpoint if the fallback_job_id
# is defined in the metadata of the original job
if fallback_job_id is not None:
return fallback_schedule_redirect(
message,
url_for(
"AssetAPI:get_schedule",
uuid=fallback_job_id,
id=asset.id,
_external=True,
),
)
else:
return unknown_schedule(message)

elif job.is_started:
return unknown_schedule(f"Scheduling job in progress. {scheduler_info_msg}")
elif job.is_queued:
return unknown_schedule(
f"Scheduling job waiting to be processed. {scheduler_info_msg}"
)
elif job.is_deferred:
try:
preferred_job = job.dependency
except NoSuchJobError:
return unknown_schedule(
f"Scheduling job waiting for unknown job to be processed. {scheduler_info_msg}"
)
return unknown_schedule(
f'Scheduling job waiting for {preferred_job.status} job "{preferred_job.id}" to be processed. {scheduler_info_msg}'
)
else:
return unknown_schedule(
f"Scheduling job has an unknown status. {scheduler_info_msg}"
)

overall_schedule_response = []
for child_job in job.fetch_dependencies():
sensor = get_asset_or_sensor_from_ref(child_job.kwargs["asset_or_sensor"])
schedule_start = child_job.kwargs["start"]

data_source = get_data_source_for_job(child_job)
if data_source is None:
return unknown_schedule(
error_message
+ f"no data source could be found for {data_source}. {scheduler_info_msg}"
)

power_values = sensor.search_beliefs(
event_starts_after=schedule_start,
event_ends_before=schedule_start + planning_horizon,
source=data_source,
most_recent_beliefs_only=True,
one_deterministic_belief_per_event=True,
)

sign = 1
if sensor.get_attribute("consumption_is_positive", True):
sign = -1

# For consumption schedules, positive values denote consumption. For the db, consumption is negative
consumption_schedule = sign * simplify_index(power_values)["event_value"]
if consumption_schedule.empty:
return unknown_schedule(
f"{error_message} the schedule was not found in the database. {scheduler_info_msg}"
)

# Update the planning window
resolution = sensor.event_resolution
start = consumption_schedule.index[0]
duration = min(
duration, consumption_schedule.index[-1] + resolution - start
)
consumption_schedule = consumption_schedule[
start : start + duration - resolution
]
sensor_schedule_response = dict(
sensor=sensor.id,
values=consumption_schedule.tolist(),
start=isodate.datetime_isoformat(start),
duration=duration_isoformat(duration),
unit=sensor.unit,
)
overall_schedule_response.append(sensor_schedule_response)

d, s = request_processed(scheduler_info_msg)
return (
dict(
scheduler_info=scheduler_info, schedule=overall_schedule_response, **d
),
s,
)
14 changes: 8 additions & 6 deletions flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,16 @@ def test_asset_trigger_and_get_schedule(
scheduler_source = get_data_source_for_job(scheduling_job)
assert scheduler_source is not None

# try to retrieve the schedule through the /sensors/<id>/schedules/<job_id> [GET] api endpoint
# try to retrieve the schedule through the /assets/<id>/schedules/<job_id> [GET] api endpoint
get_schedule_response = client.get(
url_for(
"SensorAPI:get_schedule", id=sensor.id, uuid=scheduling_job.id
), # todo: use (last?) job_id from trigger response
url_for("AssetAPI:get_schedule", id=sensor.id, uuid=job_id),
query_string={"duration": "PT48H"},
)
print("Server responded with:\n%s" % get_schedule_response.json)
assert get_schedule_response.status_code == 200
# assert get_schedule_response.json["type"] == "GetDeviceMessageResponse"
assert len(get_schedule_response.json["values"]) == expected_length_of_schedule
assert len(get_schedule_response.json["schedule"]) == len(message["flex-model"])
assert get_schedule_response.json["schedule"][0]["sensor"] == sensor.id
assert (
len(get_schedule_response.json["schedule"][0]["values"])
== expected_length_of_schedule
)
3 changes: 3 additions & 0 deletions flexmeasures/data/services/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ def create_sequential_scheduling_job(
connection=current_app.queues["scheduling"].connection,
)

# Stand-in for MultiStorageScheduler
job.meta["scheduler_info"] = dict(scheduler="A sequential job scheduling policy")

job_status = job.get_status(refresh=True)

jobs.append(job)
Expand Down

0 comments on commit eaf5f4d

Please sign in to comment.