-
Notifications
You must be signed in to change notification settings - Fork 902
/
FR-COR_IT-SAR.py
129 lines (115 loc) · 4.15 KB
/
FR-COR_IT-SAR.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from datetime import datetime, timedelta
from logging import Logger, getLogger
from typing import Callable, Dict, List, Literal, Optional
from requests import Session
from .ENTSOE import fetch_exchange as fetch_ENTSOE_exchange
from .ENTSOE import fetch_exchange_forecast as fetch_ENTSOE_exchange_forecast
from .lib.config import refetch_frequency
from .lib.exceptions import ParserException
"""
Fetches data for the IT-SACOAC->IT-SAR and IT-SACODC->IT-SAR exchanges
by wrapping the ENTSOE.py parser and combining the data from the two
to produce a single data list for the FR-COR->IT-SAR exchange.
"""
EXCHANGE_FUNCTION_MAP: Dict[str, Callable] = {
"exchange": fetch_ENTSOE_exchange,
"exchange_forecast": fetch_ENTSOE_exchange_forecast,
}
def fetch_data(
zone_key1: str,
zone_key2: str,
session: Optional[Session],
target_datetime: Optional[datetime],
logger: Logger,
type: Literal["exchange", "exchange_forecast"],
) -> List[dict]:
if target_datetime is None:
target_datetime = datetime.utcnow()
# IT-SACOAC to IT-SAR
AC_dataList = EXCHANGE_FUNCTION_MAP[type](
zone_key1="FR-COR-AC",
zone_key2="IT-SAR",
session=session,
target_datetime=target_datetime,
logger=logger,
)
# IT-SACODC to IT-SAR
DC_dataList = EXCHANGE_FUNCTION_MAP[type](
zone_key1="FR-COR-DC",
zone_key2="IT-SAR",
session=session,
target_datetime=target_datetime,
logger=logger,
)
returnList: List[dict] = []
# Compare the length and datetimes of the two data lists and
# if they are the same combine the data from the two lists.
if (
len(AC_dataList) == len(DC_dataList)
and AC_dataList[0]["datetime"] == DC_dataList[0]["datetime"]
and AC_dataList[-1]["datetime"] == DC_dataList[-1]["datetime"]
):
logger.info("Clean match! Merging data with zip")
for AC, DC in zip(AC_dataList, DC_dataList):
returnList.append(
{
"datetime": AC["datetime"],
"netFlow": AC["netFlow"] + DC["netFlow"],
"sortedZoneKeys": "FR-COR->IT-SAR",
"source": AC["source"],
}
)
# Parse values from the two lists and loop over them to find missing datetimes
# if the two lists are not of equal length and do not have the same datetimes.
else:
logger.info("Data mismatch! Looping over both lists to match datetimes")
for AC_data in AC_dataList:
for DC_data in DC_dataList:
if AC_data["datetime"] == DC_data["datetime"]:
returnList.append(
{
"datetime": AC_data["datetime"],
"netFlow": AC_data["netFlow"] + DC_data["netFlow"],
"sortedZoneKeys": "FR-COR->IT-SAR",
"source": AC_data["source"],
}
)
if returnList != []:
return returnList
else:
raise ParserException(
parser="FR-COR_IT-SAR.py",
message=f"No matching AC and DC data found for {zone_key1}->{zone_key2} exchange at {target_datetime}.",
)
@refetch_frequency(timedelta(days=2))
def fetch_exchange(
zone_key1: str = "FR-COR",
zone_key2: str = "IT-SAR",
session: Optional[Session] = None,
target_datetime: Optional[datetime] = None,
logger: Logger = getLogger(__name__),
) -> List[dict]:
return fetch_data(
zone_key1=zone_key1,
zone_key2=zone_key2,
session=session,
target_datetime=target_datetime,
logger=logger,
type="exchange",
)
@refetch_frequency(timedelta(days=2))
def fetch_exchange_forecast(
zone_key1: str = "FR-COR",
zone_key2: str = "IT-SAR",
session: Optional[Session] = None,
target_datetime: Optional[datetime] = None,
logger: Logger = getLogger(__name__),
) -> List[dict]:
return fetch_data(
zone_key1=zone_key1,
zone_key2=zone_key2,
session=session,
target_datetime=target_datetime,
logger=logger,
type="exchange_forecast",
)