-
-
Notifications
You must be signed in to change notification settings - Fork 52
/
api.py
396 lines (318 loc) · 12.4 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
# -*- coding: utf-8 -*-
# Copyright (c) 2018-2021, earthobservations developers.
# Distributed under the MIT License. See LICENSE for more info.
import gzip
import logging
from datetime import datetime
from enum import Enum
from io import BytesIO
from typing import Generator, Optional, Tuple, Union
import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from requests_ftp.ftp import FTPSession
from urllib3 import Retry
from wetterdienst.core.scalar.request import ScalarRequestCore
from wetterdienst.core.scalar.values import ScalarValuesCore
from wetterdienst.exceptions import FailedDownload
from wetterdienst.metadata.columns import Columns
from wetterdienst.metadata.period import Period, PeriodType
from wetterdienst.metadata.provider import Provider
from wetterdienst.metadata.resolution import Resolution, ResolutionType
from wetterdienst.metadata.timezone import Timezone
from wetterdienst.provider.eccc.observation.metadata.dataset import (
EcccObservationDataset,
EcccObservationDatasetTree,
)
from wetterdienst.provider.eccc.observation.metadata.parameter import (
EcccObservationParameter,
)
from wetterdienst.provider.eccc.observation.metadata.resolution import (
EccObservationResolution,
)
from wetterdienst.provider.eccc.observation.metadata.unit import (
EcccObservationUnitOrigin,
EcccObservationUnitSI,
)
from wetterdienst.util.cache import payload_cache_twelve_hours
log = logging.getLogger(__name__)
class EcccObservationValues(ScalarValuesCore):
_string_parameters = []
_integer_parameters = []
_irregular_parameters = []
_data_tz = Timezone.UTC
_has_quality = True
_http = requests.Session()
_http.mount("http://", HTTPAdapter(max_retries=Retry(total=10, connect=5, read=5)))
_base_url = (
"http://climate.weather.gc.ca/climate_data/bulk_data_e.html?"
"format=csv&stationID={0}&timeframe={1}"
"&submit= Download+Data"
)
_timeframe_mapping = {
Resolution.HOURLY: "1",
Resolution.DAILY: "2",
Resolution.MONTHLY: "3",
Resolution.ANNUAL: "4",
}
@property
def _timeframe(self) -> str:
""" internal timeframe string for resolution """
return self._timeframe_mapping.get(self.stations.stations.resolution)
_time_step_mapping = {
Resolution.HOURLY: "HLY",
Resolution.DAILY: "DLY",
Resolution.MONTHLY: "MLY",
Resolution.ANNUAL: "ANL",
}
@property
def _time_step(self):
""" internal time step string for resolution """
return self._time_step_mapping.get(self.stations.stations.resolution)
def _create_humanized_parameters_mapping(self):
# TODO: change to something general, depending on ._has_datasets
hcnm = {
parameter.value: parameter.name.lower()
for parameter in self.stations.stations._parameter_base[
self.stations.stations.resolution.name
]
}
return hcnm
@staticmethod
def _tidy_up_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Tidy up dataframe pairwise by column
'DATE', 'Temp (°C)', 'Temp Flag', ...
"""
df_tidy = pd.DataFrame()
columns = df.columns
for parameter_column, quality_column in zip(columns[1::2], columns[2::2]):
df_parameter = pd.DataFrame(
{
Columns.DATE.value: df[Columns.DATE.value],
Columns.VALUE.value: df[parameter_column],
Columns.QUALITY.value: df[quality_column],
}
)
df_parameter[Columns.PARAMETER.value] = parameter_column
df_tidy = df_tidy.append(df_parameter)
df_tidy = df_tidy.reindex(
columns=[
Columns.DATE.value,
Columns.PARAMETER.value,
Columns.VALUE.value,
Columns.QUALITY.value,
]
)
return df_tidy
def _collect_station_parameter(
self, station_id: str, parameter: Tuple[Enum, Enum]
) -> pd.DataFrame:
parameter, dataset = parameter
meta = self.stations.df[
self.stations.df[Columns.STATION_ID.value] == station_id
]
name, from_date, to_date = (
meta[
[
Columns.NAME.value,
Columns.FROM_DATE.value,
Columns.TO_DATE.value,
]
]
.values.flatten()
.tolist()
)
# start and end year from station
start_year = None if pd.isna(from_date) else from_date.year
end_year = None if pd.isna(to_date) else to_date.year
# start_date and end_date from request
start_date = self.stations.stations.start_date
end_date = self.stations.stations.end_date
start_year = start_year and max(
start_year, start_date and start_date.year or start_year
)
end_year = end_year and min(end_year, end_date and end_date.year or end_year)
# Following lines may partially be based on @Zeitsperre's canada-climate-python
# code at https://github.com/Zeitsperre/canada-climate-python/blob/
# master/ECCC_stations_fulldownload.py
df = pd.DataFrame()
# check that station has a first and last year value
if start_year and end_year:
for url in self._create_file_urls(station_id, start_year, end_year):
log.info(f"Acquiring file from {url}")
# TODO change this back to verfiy=False
payload = self._http.get(url, timeout=60, verify=False)
df_temp = pd.read_csv(BytesIO(payload.content))
df_temp = df_temp.rename(columns=str.lower)
df_temp = df_temp.drop(
columns=[
"longitude (x)",
"latitude (y)",
"station name",
"climate id",
"year",
"month",
"day",
"time (lst)",
],
errors="ignore",
)
df = df.append(df_temp)
df = df.rename(
columns={
"date/time (lst)": Columns.DATE.value,
"date/time": Columns.DATE.value,
}
)
df = df.reset_index(drop=True)
df = df.drop(columns=["data quality"], errors="ignore")
if self.stations.stations.tidy:
df = self._tidy_up_dataframe(df)
if parameter not in self.stations.stations._dataset_base:
df = df[df[Columns.PARAMETER.value] == parameter.value]
df[Columns.STATION_ID.value] = station_id
return df
def _create_file_urls(
self, station_id: str, start_year: int, end_year: int
) -> Generator[str, None, None]:
# TODO: make faster, requests per month take too long!
# if self.stations.stations.resolution != Resolution.HOURLY:
# url = self._base_url.format(int(station_id), self._timeframe)
#
# yield url
# else:
resolution = self.stations.stations.resolution
freq = "Y"
if resolution == Resolution.HOURLY:
freq = "M"
# For hourly data request only necessary data to reduce amount of data being
# downloaded and parsed
for date in pd.date_range(
f"{start_year}-01-01", f"{end_year + 1}-01-01", freq=freq, closed=None
):
url = self._base_url.format(int(station_id), self._timeframe)
url += f"&Year={date.year}"
if resolution == Resolution.HOURLY:
url += "&Month={date.month}"
yield url
class EcccObservationRequest(ScalarRequestCore):
"""
Download weather data from Environment and Climate Change Canada (ECCC).
- https://www.canada.ca/en/environment-climate-change.html
- https://www.canada.ca/en/services/environment/weather.html
Original code by Trevor James Smith. Thanks!
- https://github.com/Zeitsperre/canada-climate-python
"""
provider = Provider.ECCC
_tz = Timezone.UTC
_resolution_base = EccObservationResolution
_resolution_type = ResolutionType.MULTI
_period_type = PeriodType.FIXED
_period_base = Period.HISTORICAL
_parameter_base = EcccObservationParameter # replace with parameter enumeration
_has_datasets = True
_dataset_base = EcccObservationDataset
_dataset_tree = EcccObservationDatasetTree
_unique_dataset = True
_origin_unit_tree = EcccObservationUnitOrigin
_si_unit_tree = EcccObservationUnitSI
_values = EcccObservationValues
@property
def _columns_mapping(self) -> dict:
cm = self._base_columns_mapping
cm.update(self._dates_columns_mapping)
return cm
@property
def _dates_columns_mapping(self) -> dict:
dcm = {}
from_date, to_date = None, None
if self.resolution == Resolution.HOURLY:
from_date, to_date = "hly first year", "hly last year"
elif self.resolution == Resolution.DAILY:
from_date, to_date = "dly first year", "dly last year"
elif self.resolution == Resolution.MONTHLY:
from_date, to_date = "mly first year", "mly last year"
elif self.resolution == Resolution.ANNUAL:
from_date, to_date = "first year", "last year"
dcm.update(
{
from_date: Columns.FROM_DATE.value,
to_date: Columns.TO_DATE.value,
}
)
return dcm
_base_columns_mapping: dict = {
"station id": Columns.STATION_ID.value,
"name": Columns.NAME.value,
"province": Columns.STATE.value,
# "CLIMATE_ID",
# "WMO_ID",
# "TC_ID",
"latitude (decimal degrees)": Columns.LATITUDE.value,
"longitude (decimal degrees)": Columns.LONGITUDE.value,
"elevation (m)": Columns.HEIGHT.value,
}
def __init__(
self,
parameter: Tuple[Union[str, EcccObservationParameter]],
resolution: Union[EccObservationResolution, Resolution],
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
humanize: bool = True,
tidy: bool = True,
si_units: bool = True,
):
super(EcccObservationRequest, self).__init__(
parameter=parameter,
resolution=resolution,
period=Period.HISTORICAL,
start_date=start_date,
end_date=end_date,
humanize=humanize,
tidy=tidy,
si_units=si_units,
)
def _all(self) -> pd.DataFrame:
# Acquire raw CSV payload.
csv_payload = self._download_stations()
# Read into Pandas data frame.
df = pd.read_csv(BytesIO(csv_payload), header=2, dtype=str)
df = df.rename(columns=str.lower)
df = df.drop(columns=["latitude", "longitude"])
df = df.rename(columns=self._columns_mapping)
return df
@staticmethod
@payload_cache_twelve_hours.cache_on_arguments()
def _download_stations() -> bytes:
"""
Download station list from ECCC FTP server.
:return: CSV payload
"""
ftp_url = (
"ftp://client_climate:foobar@ftp.tor.ec.gc.ca"
"/Pub/Get_More_Data_Plus_de_donnees/Station Inventory EN.csv"
)
http_url = (
"https://raw.githubusercontent.com/earthobservations/testdata"
"/main/ftp.tor.ec.gc.ca/Pub/Get_More_Data_Plus_de_donnees"
"/Station%20Inventory%20EN.csv.gz"
)
payload = None
# Try original source.
session = FTPSession()
try:
response = session.retr(ftp_url)
payload = response.content
except Exception:
log.exception(f"Unable to access FTP server at {ftp_url}")
# Fall back to different source.
try:
response = requests.get(http_url)
response.raise_for_status()
with gzip.open(BytesIO(response.content), mode="rb") as f:
payload = f.read()
except Exception:
log.exception(f"Unable to access HTTP server at {http_url}")
if payload is None:
raise FailedDownload("Unable to acquire ECCC stations list")
return payload