-
Notifications
You must be signed in to change notification settings - Fork 902
/
ESIOS.py
83 lines (65 loc) · 2.43 KB
/
ESIOS.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#!/usr/bin/env python3
from datetime import datetime
from logging import Logger, getLogger
from typing import Optional
from urllib.parse import urlencode
# The arrow library is used to handle datetimes
import arrow
from requests import Response, Session
from .lib.exceptions import ParserException
from .lib.utils import get_token
def fetch_exchange(
zone_key1: str = "ES",
zone_key2: str = "MA",
session: Optional[Session] = None,
target_datetime: Optional[datetime] = None,
logger: Logger = getLogger(__name__),
) -> list:
if target_datetime:
raise NotImplementedError("This parser is not yet able to parse past dates")
# Get ESIOS token
token = get_token("ESIOS_TOKEN")
ses = session or Session()
# Request headers
headers = {
"Content-Type": "application/json",
"Accept": "application/json; application/vnd.esios-api-v2+json",
"Authorization": 'Token token="{0}"'.format(token),
}
# Request query url
utc = arrow.utcnow()
start_date = utc.shift(hours=-24).floor("hour").isoformat()
end_date = utc.ceil("hour").isoformat()
dates = {"start_date": start_date, "end_date": end_date}
query = urlencode(dates)
url = "https://api.esios.ree.es/indicators/10209?{0}".format(query)
response: Response = ses.get(url, headers=headers)
if response.status_code != 200 or not response.text:
raise ParserException(
"ESIOS", "Response code: {0}".format(response.status_code)
)
json = response.json()
values = json["indicator"]["values"]
if not values:
raise ParserException("ESIOS", "No values received")
else:
data = []
sorted_zone_keys = sorted([zone_key1, zone_key2])
for value in values:
# Get last value in datasource
datetime = arrow.get(value["datetime_utc"]).datetime
# Datasource negative value is exporting, positive value is importing
net_flow = -value["value"]
value_data = {
"sortedZoneKeys": "->".join(sorted_zone_keys),
"datetime": datetime,
"netFlow": net_flow
if zone_key1 == sorted_zone_keys[0]
else -1 * net_flow,
"source": "api.esios.ree.es",
}
data.append(value_data)
return data
if __name__ == "__main__":
session = Session()
print(fetch_exchange("ES", "MA", session))