/
DK.py
132 lines (116 loc) · 4.76 KB
/
DK.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
from requests import Response, Session
from electricitymap.contrib.lib.models.event_lists import ExchangeList
from electricitymap.contrib.lib.types import ZoneKey
from parsers.lib.config import refetch_frequency
from parsers.lib.exceptions import ParserException
EXCHANGE_MAPPING = {
"DE->DK-DK1": {"id": "ExchangeGermany", "direction": 1, "priceArea": "DK1"},
"DE->DK-DK2": {"id": "ExchangeGermany", "direction": 1, "priceArea": "DK2"},
"DK-DK1->DK-DK2": {"id": "ExchangeGreatBelt", "direction": -1, "priceArea": "DK1"},
"DK-DK1->NL": {"id": "ExchangeNetherlands", "direction": -1, "priceArea": "DK1"},
"DK-DK1->NO-NO2": {"id": "ExchangeNorway", "direction": -1, "priceArea": "DK1"},
"DK-DK1->SE-SE3": {"id": "ExchangeSweden", "direction": -1, "priceArea": "DK1"},
"DK-DK2->SE-SE4": {"id": "ExchangeSweden", "direction": -1, "priceArea": "DK2"},
"DK-BHM->SE-SE4": {"id": "BornholmSE4", "direction": -1, "priceArea": "DK2"},
}
def fetch_data(
sorted_keys: ZoneKey,
session: Session | None,
target_datetime: datetime | None,
logger: Logger,
) -> dict:
"""
Helper function to fetch data from the API.
"""
ses = session or Session()
if target_datetime and target_datetime.tzinfo:
# Data source doesn't support timezone aware
# datetimes.
target_datetime = target_datetime.replace(tzinfo=None)
price_area = EXCHANGE_MAPPING[sorted_keys]["priceArea"]
params = {
"limit": 500,
"filter": '{"PriceArea":"DK1"}'
if price_area == "DK1"
else '{"PriceArea":"DK2"}',
"start": target_datetime.strftime("%Y-%m-%d") if target_datetime else None,
"end": (target_datetime + timedelta(days=1)).strftime("%Y-%m-%d")
if target_datetime
else None,
}
response: Response = ses.get(
"https://api.energidataservice.dk/dataset/ElectricityProdex5MinRealtime",
params=params,
)
if response.ok:
data = response.json()
if data["total"] == 0:
raise ParserException(
parser="DK.py",
zone_key=sorted_keys,
message=f"No exchange data was returned for {target_datetime.date() or datetime.now().date()}",
)
else:
return data
else:
raise ParserException(
parser="DK.py",
zone_key=sorted_keys,
message=f"No exchange data was returned for {target_datetime.date() or datetime.now().date()}",
)
def flow(sorted_keys: ZoneKey, datapoint: dict) -> int | float | None:
"""
Helper function to extract the net flow from a datapoint.
"""
if sorted_keys == "DK-DK2->SE-SE4":
# Exchange from Bornholm to Sweden is included in "ExchangeSweden"
# but Bornholm island is reported separately from DK-DK2 in Electricity Maps
return (
datapoint[EXCHANGE_MAPPING["DK-DK2->SE-SE4"]["id"]]
* EXCHANGE_MAPPING["DK-DK2->SE-SE4"]["direction"]
- datapoint[EXCHANGE_MAPPING["DK-BHM->SE-SE4"]["id"]]
* EXCHANGE_MAPPING["DK-BHM->SE-SE4"]["direction"]
if datapoint[EXCHANGE_MAPPING["DK-BHM->SE-SE4"]["id"]] is not None
and datapoint[EXCHANGE_MAPPING["DK-DK2->SE-SE4"]["id"]] is not None
else None
)
else:
return (
datapoint[EXCHANGE_MAPPING[sorted_keys]["id"]]
* EXCHANGE_MAPPING[sorted_keys]["direction"]
if datapoint[EXCHANGE_MAPPING[sorted_keys]["id"]]
else None
)
@refetch_frequency(timedelta(days=1))
def fetch_exchange(
zone_key1: ZoneKey,
zone_key2: ZoneKey,
session: Session = Session(),
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list[dict]:
sorted_keys = ZoneKey("->".join(sorted([zone_key1, zone_key2])))
data = fetch_data(sorted_keys, session, target_datetime, logger)
all_exchange_data = ExchangeList(logger)
if sorted_keys not in EXCHANGE_MAPPING:
raise ParserException(
"DK.py",
sorted_keys,
"Only able to fetch data for exchanges that are connected to Denmark (DK-DK1, DK-DK2, DK-BHM)",
)
else:
for datapoint in data["records"]:
all_exchange_data.append(
zoneKey=sorted_keys,
datetime=datetime.fromisoformat(datapoint["Minutes5UTC"]).replace(
tzinfo=timezone.utc
),
netFlow=flow(sorted_keys, datapoint),
source="energidataservice.dk",
)
return all_exchange_data.to_list()
if __name__ == "__main__":
print("fetch_exchange(DK-DK1, DE) ->")
print(fetch_exchange("DK-DK2", "SE-SE4"))