-
Notifications
You must be signed in to change notification settings - Fork 902
/
CH.py
127 lines (103 loc) · 4.01 KB
/
CH.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3
from datetime import datetime, timedelta
from logging import Logger, getLogger
from typing import Optional
import arrow
import pandas as pd
from requests import Session
from parsers.lib.config import refetch_frequency
from . import ENTSOE
def get_solar_capacity_at(date: datetime) -> float:
# Prepare historical records
# Source https://www.uvek-gis.admin.ch/BFE/storymaps/EE_Elektrizitaetsproduktionsanlagen/?lang=en
historical_capacities = pd.DataFrame.from_records(
[
("2015-01-01", 1385),
("2016-01-01", 1632),
("2017-01-01", 1844),
("2018-01-01", 2070),
("2019-01-01", 2346),
("2020-01-01", 2749),
("2021-01-01", 3129),
],
columns=["datetime", "capacity.solar"],
).set_index("datetime")
historical_capacities.index = pd.DatetimeIndex(
historical_capacities.index, tz="UTC"
)
year = date.year
if year < 2015:
return historical_capacities.loc["2015-01-01", "capacity.solar"]
else:
mask = historical_capacities.index <= date
return historical_capacities[mask].iloc[-1].loc["capacity.solar"]
def fetch_swiss_exchanges(session, target_datetime, logger):
"""Returns the total exchanges of Switzerland with its neighboring countries."""
swiss_transmissions = {}
for exchange_key in ["AT", "DE", "IT", "FR"]:
exchanges = ENTSOE.fetch_exchange(
zone_key1="CH",
zone_key2=exchange_key,
session=session,
target_datetime=target_datetime,
logger=logger,
)
if not exchanges:
continue
for exchange in exchanges:
datetime = exchange["datetime"]
if datetime not in swiss_transmissions:
swiss_transmissions[datetime] = exchange["netFlow"]
else:
swiss_transmissions[datetime] += exchange["netFlow"]
return swiss_transmissions
def fetch_swiss_consumption(
session: Session, target_datetime: datetime, logger: Logger
):
"""Returns the total consumption of Switzerland."""
consumptions = ENTSOE.fetch_consumption(
zone_key="CH", session=session, target_datetime=target_datetime, logger=logger
)
return {c["datetime"]: c["consumption"] for c in consumptions}
@refetch_frequency(timedelta(days=1))
def fetch_production(
zone_key: str = "CH",
session: Optional[Session] = None,
target_datetime: Optional[datetime] = None,
logger: Logger = getLogger(__name__),
):
"""
Returns the total production by type for Switzerland.
Currently the majority of the run-of-river production is missing.
The difference between the sum of all production types and the total production is allocated as 'unknown'.
The total production is calculated as sum of the consumption, storage and net imports.
"""
now = (
arrow.get(target_datetime).to("Europe/Zurich").datetime
if target_datetime
else arrow.now(tz="Europe/Zurich").datetime
)
r = session or Session()
exchanges = fetch_swiss_exchanges(r, now, logger)
consumptions = fetch_swiss_consumption(r, now, logger)
productions = ENTSOE.fetch_production(
zone_key=zone_key, session=r, target_datetime=now, logger=logger
)
if not productions:
return
for p in productions:
dt = p["datetime"]
if dt not in exchanges or dt not in consumptions:
continue
known_production = sum([x or 0 for x in p["production"].values()])
storage = sum([x or 0 for x in p["storage"].values()])
total_production = consumptions[dt] + storage + exchanges[dt]
unknown_production = total_production - known_production
p["production"]["unknown"] = unknown_production if unknown_production > 0 else 0
for p in productions:
p["capacity"] = {
"solar": get_solar_capacity_at(p["datetime"]),
}
return productions
if __name__ == "__main__":
print(fetch_production())