-
Notifications
You must be signed in to change notification settings - Fork 902
/
NZ.py
138 lines (115 loc) · 4.72 KB
/
NZ.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python3
# The arrow library is used to handle datetimes
import json
from datetime import datetime
from logging import Logger, getLogger
from typing import Optional
import arrow
from bs4 import BeautifulSoup
# The request library is used to fetch content through HTTP
from requests import Session
timezone = "Pacific/Auckland"
NZ_PRICE_REGIONS = set([i for i in range(1, 14)])
def fetch(session: Optional[Session] = None):
r = session or Session()
url = "https://www.transpower.co.nz/system-operator/live-system-and-market-data/consolidated-live-data"
response = r.get(url)
soup = BeautifulSoup(response.text, "html.parser")
for item in soup.find_all("script"):
if item.attrs.get("data-drupal-selector"):
body = item.contents[0]
obj = json.loads(body)
return obj
def fetch_price(
zone_key: str = "NZ",
session: Optional[Session] = None,
target_datetime: Optional[datetime] = None,
logger: Logger = getLogger(__name__),
) -> dict:
"""
Requests the current price of electricity based on the zone key.
Note that since EM6 breaks the electricity price down into regions,
the regions are averaged out for each island.
"""
if target_datetime:
raise NotImplementedError(
"This parser is not able to retrieve data for past dates"
)
r = session or Session()
url = "https://api.em6.co.nz/ords/em6/data_api/region/price/"
response = r.get(url, verify=False)
obj = response.json()
region_prices = []
regions = NZ_PRICE_REGIONS
for item in obj.get("items"):
region = item.get("grid_zone_id")
if region in regions:
time = item.get("timestamp")
price = float(item.get("price"))
region_prices.append(price)
avg_price = sum(region_prices) / len(region_prices)
datetime = arrow.get(time, tzinfo="UTC")
return {
"datetime": datetime.datetime,
"price": avg_price,
"currency": "NZD",
"source": "api.em6.co.nz",
"zoneKey": zone_key,
}
def fetch_production(
zone_key: str = "NZ",
session: Optional[Session] = None,
target_datetime: Optional[datetime] = None,
logger: Logger = getLogger(__name__),
) -> dict:
"""Requests the last known production mix (in MW) of a given zone."""
if target_datetime:
raise NotImplementedError(
"This parser is not able to retrieve data for past dates"
)
obj = fetch(session)
datetime = arrow.get(str(obj["soPgenGraph"]["timestamp"]), "X").datetime
region_key = "New Zealand"
productions = obj["soPgenGraph"]["data"][region_key]
data = {
"zoneKey": zone_key,
"datetime": datetime,
"production": {
"coal": productions.get("Coal", {"generation": None})["generation"],
"oil": productions.get("Liquid", {"generation": None})["generation"],
"gas": productions.get("Gas", {"generation": None})["generation"],
"geothermal": productions.get("Geothermal", {"generation": None})[
"generation"
],
"wind": productions.get("Wind", {"generation": None})["generation"],
"hydro": productions.get("Hydro", {"generation": None})["generation"],
"solar": productions.get("Solar", {"generation": None})["generation"],
"unknown": productions.get("Co-Gen", {"generation": None})["generation"],
"nuclear": 0, # famous issue in NZ politics
},
"capacity": {
"coal": productions.get("Coal", {"capacity": None})["capacity"],
"oil": productions.get("Liquid", {"capacity": None})["capacity"],
"gas": productions.get("Gas", {"capacity": None})["capacity"],
"geothermal": productions.get("Geothermal", {"capacity": None})["capacity"],
"wind": productions.get("Wind", {"capacity": None})["capacity"],
"hydro": productions.get("Hydro", {"capacity": None})["capacity"],
"solar": productions.get("Solar", {"capacity": None})["capacity"],
"battery storage": productions.get("Battery", {"capacity": None})[
"capacity"
],
"unknown": productions.get("Co-Gen", {"capacity": None})["capacity"],
"nuclear": 0, # famous issue in NZ politics
},
"storage": {
"battery": productions.get("Battery", {"generation": None})["generation"],
},
"source": "transpower.co.nz",
}
return data
if __name__ == "__main__":
"""Main method, never used by the Electricity Map backend, but handy for testing."""
print("fetch_price(NZ) ->")
print(fetch_price("NZ"))
print("fetch_production(NZ) ->")
print(fetch_production("NZ"))