-
Notifications
You must be signed in to change notification settings - Fork 903
/
ENTE.py
126 lines (95 loc) · 3.76 KB
/
ENTE.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python3
from datetime import datetime
from logging import Logger, getLogger
from zoneinfo import ZoneInfo
from requests import Session
# This parser gets all real time interconnection flows from the
# Central American Electrical Interconnection System (SIEPAC).
# map for reference
# https://www.enteoperador.org/flujos-regionales-en-tiempo-real/
DATA_URL = "https://mapa.enteoperador.org/WebServiceScadaEORRest/webresources/generic"
TIMEZONE = ZoneInfo("America/Tegucigalpa")
JSON_MAPPING = {
"GT->MX-OR": "2LBR.LT400.1FR2-2LBR-01A.-.MW",
"GT->SV": "3SISTEMA.LT230.INTER_NET_GT.CMW.MW",
"GT->HN": "4LEC.LT230.2FR4-4LEC-01B.-.MW",
"HN->SV": "3SISTEMA.LT230.INTER_NET_HO.CMW.MW",
"HN->NI": "5SISTEMA.LT230.INTER_NET_HN.CMW.MW",
"CR->NI": "5SISTEMA.LT230.INTER_NET_CR.CMW.MW",
"CR->PA": "6SISTEMA.LT230.INTER_NET_PAN.CMW.MW",
}
def floor_to_minute(dt: datetime) -> datetime:
return dt.replace(second=0, microsecond=0)
def fetch_production(
zone_key: str = "HN",
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> dict:
r = session or Session()
response = r.get(DATA_URL).json()
# Total production data for HN from the ENTE-data is the 57th element in the JSON ('4SISTEMA.GTOT.OSYMGENTOTR.-.MW')
production = round(response[56]["value"], 1)
dt = floor_to_minute(datetime.now(tz=TIMEZONE))
data = {
"zoneKey": zone_key,
"datetime": dt,
"production": {"unknown": production},
"source": "enteoperador.org",
}
return data
def extract_exchange(raw_data, exchange) -> float | None:
"""Extracts flow value and direction for a given exchange."""
search_value = JSON_MAPPING[exchange]
interconnection = None
for datapoint in raw_data:
if datapoint["nombre"] == search_value:
interconnection = float(datapoint["value"])
if interconnection is None:
return None
# positive and negative flow directions do not always correspond to EM ordering of exchanges
if exchange in ["GT->SV", "GT->HN", "HN->SV", "CR->NI", "HN->NI"]:
interconnection *= -1
return interconnection
def fetch_exchange(
zone_key1: str,
zone_key2: str,
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> dict:
"""Gets an exchange pair from the SIEPAC system."""
if target_datetime:
raise NotImplementedError("This parser is not yet able to parse past dates")
sorted_zones = "->".join(sorted([zone_key1, zone_key2]))
if sorted_zones not in JSON_MAPPING:
raise NotImplementedError("This exchange is not implemented.")
s = session or Session()
raw_data = s.get(DATA_URL).json()
flow = round(extract_exchange(raw_data, sorted_zones), 1)
dt = floor_to_minute(datetime.now(tz=TIMEZONE))
exchange = {
"sortedZoneKeys": sorted_zones,
"datetime": dt,
"netFlow": flow,
"source": "enteoperador.org",
}
return exchange
if __name__ == "__main__":
"""Main method, never used by the Electricity Map backend, but handy for testing."""
print("fetch_production(HN) ->")
print(fetch_production())
print("fetch_exchange(CR, PA) ->")
print(fetch_exchange("CR", "PA"))
print("fetch_exchange(CR, NI) ->")
print(fetch_exchange("CR", "NI"))
print("fetch_exchange(HN, NI) ->")
print(fetch_exchange("HN", "NI"))
print("fetch_exchange(GT, MX) ->")
print(fetch_exchange("GT", "MX"))
print("fetch_exchange(GT, SV) ->")
print(fetch_exchange("GT", "SV"))
print("fetch_exchange(GT, HN) ->")
print(fetch_exchange("GT", "HN"))
print("fetch_exchange(HN, SV) ->")
print(fetch_exchange("HN", "SV"))