-
Notifications
You must be signed in to change notification settings - Fork 7
/
cds.py
288 lines (242 loc) · 10.7 KB
/
cds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import datetime as dt
import logging
import os
import requests
import requests.adapters
from pprint import pformat
import cdsapi as cds
import numpy as np
import pandas as pd
import xarray as xr
from icenet.data.cli import download_args
from icenet.data.interfaces.downloader import ClimateDownloader
"""
Module to download hourly ERA5 reanalysis latitude-longitude maps,
compute daily averages, regrid them to the same EASE grid as the OSI-SAF sea
ice, data, and save as daily NetCDFs.
"""
class ERA5Downloader(ClimateDownloader):
"""Climate downloader to provide ERA5 reanalysis data from CDS API
:param identifier: how to identify this dataset
:param cdi_map: override the default ERA5Downloader.CDI_MAP variable map
:param use_toolbox: whether to use CDS toolbox for remote aggregation
:param show_progress: whether to show download progress
"""
CDI_MAP = {
'tas': '2m_temperature',
'ta': 'temperature', # 500
'tos': 'sea_surface_temperature',
'psl': 'surface_pressure',
'zg': 'geopotential', # 250 and 500
'hus': 'specific_humidity', # 1000
'rlds': 'surface_thermal_radiation_downwards',
'rsds': 'surface_solar_radiation_downwards',
'uas': '10m_u_component_of_wind',
'vas': '10m_v_component_of_wind',
}
def __init__(self,
*args,
identifier: str = "era5",
cdi_map: object = CDI_MAP,
use_toolbox: bool = False,
show_progress: bool = False,
**kwargs):
super().__init__(*args,
drop_vars=["lambert_azimuthal_equal_area"],
identifier=identifier,
**kwargs)
self.client = cds.Client(progress=show_progress)
self._cdi_map = cdi_map
self._use_toolbox = use_toolbox
self.download_method = self._single_api_download
if use_toolbox:
self.download_method = self._single_toolbox_download
if self._max_threads > 10:
logging.info("Upping connection limit for max_threads > 10")
adapter = requests.adapters.HTTPAdapter(
pool_connections=self._max_threads,
pool_maxsize=self._max_threads)
self.client.session.mount("https://", adapter)
def _single_toolbox_download(self, var: object, level: object,
req_dates: object, download_path: object):
"""Implements a single download from CDS Toolbox API
:param var:
:param level: the pressure level to download
:param req_dates: the request dates
:param download_path:
"""
logging.debug("Processing {} dates".format(len(req_dates)))
var_prefix = var[0:-(len(str(level)))] if level else var
params_dict = {
"realm": "c3s",
"project": "app-c3s-daily-era5-statistics",
"version": "master",
"workflow_name": "application",
"kwargs": {
"dataset": "reanalysis-era5-single-levels",
"product_type": "reanalysis",
"variable": self._cdi_map[var_prefix],
"pressure_level": "-",
"statistic": "daily_mean",
"year": req_dates[0].year,
"month": sorted(list(set([r.month for r in req_dates]))),
"frequency": "1-hourly",
"time_zone": "UTC+00:00",
"grid": "0.25/0.25",
"area": {
"lat": [
min([self.hemisphere_loc[0], self.hemisphere_loc[2]]),
max([self.hemisphere_loc[0], self.hemisphere_loc[2]])
],
"lon": [
min([self.hemisphere_loc[1], self.hemisphere_loc[3]]),
max([self.hemisphere_loc[1], self.hemisphere_loc[3]])
],
},
},
}
if level:
params_dict["kwargs"]["dataset"] = \
"reanalysis-era5-pressure-levels"
params_dict["kwargs"]["pressure_level"] = level
logging.debug("params_dict: {}".format(pformat(params_dict)))
result = self.client.service("tool.toolbox.orchestrator.workflow",
params=params_dict)
try:
logging.info("Downloading data for {}...".format(var))
logging.debug("Result: {}".format(result))
location = result[0]['location']
res = requests.get(location, stream=True)
logging.info("Writing data to {}".format(download_path))
with open(download_path, 'wb') as fh:
for r in res.iter_content(chunk_size=1024):
fh.write(r)
logging.info("Download completed: {}".format(download_path))
except Exception as e:
logging.exception("{} not deleted, look at the "
"problem".format(download_path))
raise RuntimeError(e)
def _single_api_download(self, var: str, level: object, req_dates: object,
download_path: object):
"""Implements a single download from CDS API
:param var:
:param level: the pressure level to download
:param req_dates: the request date
:param download_path:
"""
logging.debug("Processing {} dates".format(len(req_dates)))
var_prefix = var[0:-(len(str(level)))] if level else var
retrieve_dict = {
"product_type":
"reanalysis",
"variable":
self._cdi_map[var_prefix],
"year":
req_dates[0].year,
"month":
list(
set(["{:02d}".format(rd.month) for rd in sorted(req_dates)])
),
"day": ["{:02d}".format(d) for d in range(1, 32)],
"time": ["{:02d}:00".format(h) for h in range(0, 24)],
"format":
"netcdf",
"area":
self.hemisphere_loc,
}
dataset = "reanalysis-era5-single-levels"
if level:
dataset = "reanalysis-era5-pressure-levels"
retrieve_dict["pressure_level"] = level
try:
logging.info("Downloading data for {}...".format(var))
self.client.retrieve(dataset, retrieve_dict, download_path)
logging.info("Download completed: {}".format(download_path))
except Exception as e:
logging.exception("{} not deleted, look at the "
"problem".format(download_path))
raise RuntimeError(e)
def postprocess(self, var: str, download_path: object):
"""Processing of CDS downloaded files
If we've not used the toolbox to download the files, we have a lot of
hourly data to average out, which is taken care of here
:param var:
:param download_path:
"""
# if not self._use_toolbox:
logging.info("Postprocessing CDS API data at {}".format(download_path))
temp_path = "{}.bak{}".format(*os.path.splitext(download_path))
logging.debug("Moving to {}".format(temp_path))
os.rename(download_path, temp_path)
ds = xr.open_dataset(temp_path)
nom = list(ds.data_vars)[0]
da = getattr(ds.rename({nom: var}), var)
doy_counts = da.time.groupby("time.dayofyear").count()
# There are situations where the API will spit out unordered and
# partial data, so we ensure here means come from full days and don't
# leave gaps. If we can avoid expver with this, might as well, so
# that's second
# FIXME: This will cause issues for already processed latlon data
if len(doy_counts[doy_counts < 24]) > 0:
strip_dates_before = min([
dt.datetime.strptime(
"{}-{}".format(d,
pd.to_datetime(da.time.values[0]).year),
"%j-%Y")
for d in doy_counts[doy_counts < 24].dayofyear.values
])
da = da.where(da.time < pd.Timestamp(strip_dates_before), drop=True)
if 'expver' in da.coords:
logging.warning("expvers {} in coordinates, will process out but "
"this needs further work: expver needs storing for "
"later overwriting".format(da.expver))
# Ref: https://confluence.ecmwf.int/pages/viewpage.action?pageId=173385064
da = da.sel(expver=1).combine_first(da.sel(expver=5))
da = da.sortby("time").resample(time='1D').mean()
da.to_netcdf(download_path)
def additional_regrid_processing(self, datafile: str, cube_ease: object):
"""
:param datafile:
:param cube_ease:
"""
(datafile_path, datafile_name) = os.path.split(datafile)
var_name = datafile_path.split(os.sep)[self._var_name_idx]
if var_name == 'tos':
# Overwrite maksed values with zeros
logging.debug("ERA5 regrid postprocess: {}".format(var_name))
cube_ease.data[cube_ease.data.mask] = 0.
cube_ease.data = cube_ease.data.data
cube_ease.data = np.where(np.isnan(cube_ease.data), 0.,
cube_ease.data)
elif var_name in ['zg500', 'zg250']:
# Convert from geopotential to geopotential height
logging.debug("ERA5 additional regrid: {}".format(var_name))
cube_ease.data /= 9.80665
def main():
args = download_args(choices=["cdsapi", "toolbox"],
workers=True,
extra_args=((("-n", "--do-not-download"),
dict(dest="download",
action="store_false",
default=True)),
(("-p", "--do-not-postprocess"),
dict(dest="postprocess",
action="store_false",
default=True))))
logging.info("ERA5 Data Downloading")
era5 = ERA5Downloader(
var_names=args.vars,
dates=[
pd.to_datetime(date).date()
for date in pd.date_range(args.start_date, args.end_date, freq="D")
],
delete_tempfiles=args.delete,
download=args.download,
levels=args.levels,
max_threads=args.workers,
postprocess=args.postprocess,
north=args.hemisphere == "north",
south=args.hemisphere == "south",
use_toolbox=args.choice == "toolbox")
era5.download()
era5.regrid()