-
Notifications
You must be signed in to change notification settings - Fork 7
/
core.py
318 lines (267 loc) · 8.96 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
"""Core logic for bias-correction and downscaling
Math stuff and business logic goes here. This is the "business logic".
"""
import logging
from skdownscale.spatial_models import SpatialDisaggregator
import xarray as xr
from xclim import sdba
from xclim.core.calendar import convert_calendar
import xesmf as xe
logger = logging.getLogger(__name__)
# Break this down into a submodule(s) if needed.
# Assume data input here is generally clean and valid.
def train_quantiledeltamapping(
reference, historical, variable, kind, quantiles_n=100, window_n=31
):
"""Train quantile delta mapping
Parameters
----------
reference : xr.Dataset
Dataset to use as model reference.
historical : xr.Dataset
Dataset to use as historical simulation.
variable : str
Name of target variable to extract from `historical` and `reference`.
kind : {"+", "*"}
Kind of variable. Used for QDM scaling.
quantiles_n : int, optional
Number of quantiles for QDM.
window_n : int, optional
Centered window size for day-of-year grouping.
Returns
-------
xclim.sdba.adjustment.QuantileDeltaMapping
"""
qdm = sdba.adjustment.QuantileDeltaMapping(
kind=str(kind),
group=sdba.Grouper("time.dayofyear", window=int(window_n)),
nquantiles=int(quantiles_n),
)
qdm.train(ref=reference[variable], hist=historical[variable])
return qdm
def adjust_quantiledeltamapping_year(
simulation, qdm, year, variable, halfyearwindow_n=10
):
"""Apply QDM to adjust a year within a simulation.
Parameters
----------
simulation : xr.Dataset
Daily simulation data to be adjusted. Must have sufficient observations
around `year` to adjust.
qdm : xr.Dataset or sdba.adjustment.QuantileDeltaMapping
Trained ``xclim.sdba.adjustment.QuantileDeltaMapping``, or
Dataset representation that will be instantiate
``xclim.sdba.adjustment.QuantileDeltaMapping``.
year : int
Target year to adjust, with rolling years and day grouping.
variable : str
Target variable in `simulation` to adjust. Adjusted output will share the
same name.
halfyearwindow_n : int, optional
Half-length of the annual rolling window to extract along either
side of `year`.
Returns
-------
out : xr.Dataset
QDM-adjusted values from `simulation`. May be a lazy-evaluated future, not
yet computed.
"""
year = int(year)
variable = str(variable)
halfyearwindow_n = int(halfyearwindow_n)
if isinstance(qdm, xr.Dataset):
qdm = sdba.adjustment.QuantileDeltaMapping.from_dataset(qdm)
# Slice to get 15 days before and after our target year. This accounts
# for the rolling 31 day rolling window.
timeslice = slice(
f"{year - halfyearwindow_n - 1}-12-17", f"{year + halfyearwindow_n + 1}-01-15"
)
simulation = simulation[variable].sel(
time=timeslice
) # TODO: Need a check to ensure we have all the data in this slice!
out = qdm.adjust(simulation, interp="nearest").sel(time=str(year))
return out.to_dataset(name=variable)
def apply_bias_correction(
gcm_training_ds,
obs_training_ds,
gcm_predict_ds,
train_variable,
out_variable,
method,
):
"""Bias correct input model data using specified method,
using either monthly or +/- 15 day time grouping. Currently
the QDM method is supported.
Parameters
----------
gcm_training_ds : Dataset
training model data for building quantile map
obs_training_ds : Dataset
observation data for building quantile map
gcm_predict_ds : Dataset
future model data to be bias corrected
train_variable : str
variable name used in training data
out_variable : str
variable name used in downscaled output
method : {"QDM"}
method to be used in the applied bias correction
Returns
-------
ds_predicted : xr.Dataset
Dataset that has been bias corrected.
"""
if method == "QDM":
# instantiates a grouper class that groups by day of the year
# centered window: +/-15 day group
group = sdba.Grouper("time.dayofyear", window=31)
model = sdba.adjustment.QuantileDeltaMapping(group=group, kind="+")
model.train(
ref=obs_training_ds[train_variable], hist=gcm_training_ds[train_variable]
)
predicted = model.adjust(sim=gcm_predict_ds[train_variable])
else:
raise ValueError("this method is not supported")
ds_predicted = predicted.to_dataset(name=out_variable)
return ds_predicted
def apply_downscaling(
bc_ds,
obs_climo_coarse,
obs_climo_fine,
train_variable,
out_variable,
method,
domain_fine,
weights_path=None,
):
"""Downscale input bias corrected data using specified method.
Currently only the BCSD method for spatial disaggregation is
supported.
Parameters
----------
bc_ds : Dataset
Model data that has already been bias corrected.
obs_climo_coarse : Dataset
Observation climatologies at coarse resolution.
obs_climo_fine : Dataset
Observation climatologies at fine resolution.
train_variable : str
Variable name used in obs data.
out_variable : str
Variable name used in downscaled output.
method : {"BCSD"}
Vethod to be used in the applied downscaling.
domain_fine : Dataset
Domain that specifies the fine resolution grid to downscale to.
weights_path : str or None, optional
Path to the weights file, used for downscaling to fine resolution.
Returns
-------
af_fine : xr.Dataset
A dataset of adjustment factors at fine resolution used in downscaling.
ds_downscaled : xr.Dataset
A model dataset that has been downscaled from the bias correction resolution to specified domain file resolution.
"""
if method == "BCSD":
model = SpatialDisaggregator(var=train_variable)
af_coarse = model.fit(bc_ds, obs_climo_coarse, var_name=train_variable)
# regrid adjustment factors
# BCSD uses bilinear interpolation for both temperature and precip to
# regrid adjustment factors
af_fine = xesmf_regrid(af_coarse, domain_fine, "bilinear", weights_path)
# apply adjustment factors
predicted = model.predict(
af_fine, obs_climo_fine[train_variable], var_name=train_variable
)
else:
raise ValueError("this method is not supported")
ds_downscaled = predicted.to_dataset(name=out_variable)
return af_fine, ds_downscaled
def build_xesmf_weights_file(x, domain, method, filename=None):
"""Build ESMF weights file for regridding x to a global grid
Parameters
----------
x : xr.Dataset
domain : xr.Dataset
Domain to regrid to.
method : str
Method of regridding. Passed to ``xesmf.Regridder``.
filename : optional
Local path to output netCDF weights file.
Returns
-------
outfilename : str
Path to resulting weights file.
"""
out = xe.Regridder(
x,
domain,
method=method,
filename=filename,
)
return str(out.filename)
def xesmf_regrid(x, domain, method, weights_path=None):
"""
Parameters
----------
x : xr.Dataset
domain : xr.Dataset
Domain to regrid to.
method : str
Method of regridding. Passed to ``xesmf.Regridder``.
weights_path : str, optional
Local path to netCDF file of pre-calculated XESMF regridding weights.
Returns
-------
xr.Dataset
"""
regridder = xe.Regridder(
x,
domain,
method=method,
filename=weights_path,
)
return regridder(x)
def standardize_gcm(ds, leapday_removal=True):
"""
Parameters
----------
x : xr.Dataset
leapday_removal : bool, optional
Returns
-------
xr.Dataset
"""
dims_to_drop = []
if "height" in ds.dims:
dims_to_drop.append("height")
if "member_id" in ds.dims:
dims_to_drop.append("member_id")
if "time_bnds" in ds.dims:
dims_to_drop.append("time_bnds")
if "member_id" in ds.dims:
ds_cleaned = ds.isel(member_id=0).drop(dims_to_drop)
else:
ds_cleaned = ds.drop(dims_to_drop)
if leapday_removal:
# if calendar is just integers, xclim cannot understand it
if ds.time.dtype == "int64":
ds_cleaned["time"] = xr.decode_cf(ds_cleaned).time
# remove leap days and update calendar
ds_noleap = xclim_remove_leapdays(ds_cleaned)
# rechunk, otherwise chunks are different sizes
ds_out = ds_noleap.chunk(730, len(ds.lat), len(ds.lon))
else:
ds_out = ds_cleaned
return ds_out
def xclim_remove_leapdays(ds):
"""
Parameters
----------
ds : xr.Dataset
Returns
-------
xr.Dataset
"""
ds_noleap = convert_calendar(ds, target="noleap")
return ds_noleap