-
Notifications
You must be signed in to change notification settings - Fork 903
/
NO-NO4_SE.py
129 lines (116 loc) · 4.2 KB
/
NO-NO4_SE.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from collections.abc import Callable
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
from typing import Literal
from requests import Session
from .ENTSOE import fetch_exchange as fetch_ENTSOE_exchange
from .ENTSOE import fetch_exchange_forecast as fetch_ENTSOE_exchange_forecast
from .lib.config import refetch_frequency
from .lib.exceptions import ParserException
"""
Fetches data for the NO-NO4->SE-SE1 and NO-NO4->SE-SE2 exchanges
by wrapping the ENTSOE.py parser and combining the data from the two
to produce a single data list for the NO-NO4->SE exchange.
"""
EXCHANGE_FUNCTION_MAP: dict[str, Callable] = {
"exchange": fetch_ENTSOE_exchange,
"exchange_forecast": fetch_ENTSOE_exchange_forecast,
}
def fetch_data(
zone_key1: str,
zone_key2: str,
session: Session | None,
target_datetime: datetime | None,
logger: Logger,
exchange_type: Literal["exchange", "exchange_forecast"],
) -> list[dict]:
if target_datetime is None:
target_datetime = datetime.now(timezone.utc)
# NO-NO4 to SE-SE1
SE1_dataList = EXCHANGE_FUNCTION_MAP[exchange_type](
zone_key1="NO-NO4",
zone_key2="SE-SE1",
session=session,
target_datetime=target_datetime,
logger=logger,
)
# NO-NO4 to SE-SE2
SE2_dataList = EXCHANGE_FUNCTION_MAP[exchange_type](
zone_key1="NO-NO4",
zone_key2="SE-SE2",
session=session,
target_datetime=target_datetime,
logger=logger,
)
returnList: list[dict] = []
# Compare the length and datetimes of the two data lists and
# if they are the same combine the data from the two lists.
if (
len(SE1_dataList) == len(SE2_dataList)
and SE1_dataList[0]["datetime"] == SE2_dataList[0]["datetime"]
and SE1_dataList[-1]["datetime"] == SE2_dataList[-1]["datetime"]
):
logger.info("Clean match! Merging data with zip")
for SE1, SE2 in zip(SE1_dataList, SE2_dataList, strict=True):
returnList.append(
{
"datetime": SE1["datetime"],
"netFlow": SE1["netFlow"] + SE2["netFlow"],
"sortedZoneKeys": "NO-NO4->SE",
"source": SE1["source"],
}
)
# Parse values from the two lists and loop over them to find missing datetimes
# if the two lists are not of equal length and do not have the same datetimes.
else:
logger.info("Data mismatch! Recursively looping over lists to match data")
for SE1_data in SE1_dataList:
for SE2_data in SE2_dataList:
if SE1_data["datetime"] == SE2_data["datetime"]:
returnList.append(
{
"datetime": SE1_data["datetime"],
"netFlow": SE1_data["netFlow"] + SE2_data["netFlow"],
"sortedZoneKeys": "NO-NO4->SE",
"source": SE1_data["source"],
}
)
if returnList != []:
return returnList
else:
raise ParserException(
parser="NO-NO4_SE.py",
message=f"No matching exchange data found between {zone_key1} and {zone_key2} at {target_datetime}.",
)
@refetch_frequency(timedelta(days=2))
def fetch_exchange(
zone_key1: str = "NO-NO4",
zone_key2: str = "SE",
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list[dict]:
return fetch_data(
zone_key1=zone_key1,
zone_key2=zone_key2,
session=session,
target_datetime=target_datetime,
logger=logger,
exchange_type="exchange",
)
@refetch_frequency(timedelta(days=2))
def fetch_exchange_forecast(
zone_key1: str = "NO-NO4",
zone_key2: str = "SE",
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list[dict]:
return fetch_data(
zone_key1=zone_key1,
zone_key2=zone_key2,
session=session,
target_datetime=target_datetime,
logger=logger,
exchange_type="exchange_forecast",
)