Skip to content

Commit

Permalink
multiprocessing for vertical interp (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmaussion committed Oct 31, 2016
1 parent 01d9938 commit a4f1706
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 16 deletions.
23 changes: 17 additions & 6 deletions salem/sio.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ def quick_map(self, ax=None, interp='nearest', **kwargs):
"""Make a plot of the DataArray."""
return self._quick_map(self._obj, ax=ax, interp=interp, **kwargs)

def interpz(self, zcoord, levels, dim_name=''):
def interpz(self, zcoord, levels, dim_name='', use_multiprocessing=True):
"""Interpolates the array along the vertical dimension
Parameters
Expand All @@ -627,6 +627,8 @@ def interpz(self, zcoord, levels, dim_name=''):
the levels at which to interpolate
dim_name: str
the name of the new dimension
use_multiprocessing: bool
set to false if, for some reason, you don't want to use mp
Returns
-------
Expand All @@ -637,7 +639,8 @@ def interpz(self, zcoord, levels, dim_name=''):
raise RuntimeError('zdimension not recognized')

data = wrftools.interp3d(self._obj.values, zcoord.values,
np.atleast_1d(levels))
np.atleast_1d(levels), use_multiprocessing=
use_multiprocessing)

dims = list(self._obj.dims)
zd = np.nonzero([self.z_dim == d for d in dims])[0][0]
Expand Down Expand Up @@ -700,7 +703,7 @@ def transform_and_add(self, other, grid=None, interp='nearest', ks=3,
new_name = v
self._obj[new_name] = out[v]

def wrf_zlevel(self, varname, levels=None):
def wrf_zlevel(self, varname, levels=None, use_multiprocessing=True):
"""Interpolates to a specified height above sea level.
Parameters
Expand All @@ -709,6 +712,8 @@ def wrf_zlevel(self, varname, levels=None):
the name of the variable to interpolate
levels: 1d array
levels at which to interpolate (default: some levels I thought of)
use_multiprocessing: bool
set to false if, for some reason, you don't want to use mp
Returns
-------
Expand All @@ -719,12 +724,14 @@ def wrf_zlevel(self, varname, levels=None):
1000, 2000, 3000, 5000, 7500, 10000])

zcoord = self._obj['Z']
out = self._obj[varname].salem.interpz(zcoord, levels, dim_name='z')
out = self._obj[varname].salem.interpz(zcoord, levels, dim_name='z',
use_multiprocessing=
use_multiprocessing)
out['z'].attrs['description'] = 'height above sea level'
out['z'].attrs['units'] = 'm'
return out

def wrf_plevel(self, varname, levels=None):
def wrf_plevel(self, varname, levels=None, use_multiprocessing=True):
"""Interpolates to a specified pressure level (hPa).
Parameters
Expand All @@ -733,6 +740,8 @@ def wrf_plevel(self, varname, levels=None):
the name of the variable to interpolate
levels: 1d array
levels at which to interpolate (default: some levels I thought of)
use_multiprocessing: bool
set to false if, for some reason, you don't want to use mp
Returns
-------
Expand All @@ -743,7 +752,9 @@ def wrf_plevel(self, varname, levels=None):
650, 600, 550, 500, 450, 400, 300, 200, 100])

zcoord = self._obj['PRESSURE']
out = self._obj[varname].salem.interpz(zcoord, levels, dim_name='p')
out = self._obj[varname].salem.interpz(zcoord, levels, dim_name='p',
use_multiprocessing=
use_multiprocessing)
out['p'].attrs['description'] = 'pressure'
out['p'].attrs['units'] = 'hPa'
return out
Expand Down
5 changes: 4 additions & 1 deletion salem/tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,11 @@ def test_3d_interp(self):
assert_allclose(out.sel(p=300.), ref_2d*0. + 300.)

ds = sio.open_wrf_dataset(get_demo_file('wrfout_d01.nc'))
ws_h = ds.isel(time=1).salem.wrf_zlevel('WS', levels=8000.)
ws_h = ds.isel(time=1).salem.wrf_zlevel('WS', levels=8000.,
use_multiprocessing=False)
assert np.all(np.isfinite(ws_h))
ws_h2 = ds.isel(time=1).salem.wrf_zlevel('WS', levels=8000.)
assert_allclose(ws_h, ws_h2)


class TestGeogridSim(unittest.TestCase):
Expand Down
44 changes: 35 additions & 9 deletions salem/wrftools.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@

from salem import lazy_property, wgs84, gis

POOL = None


def _init_pool():
global POOL
if POOL is None:
import multiprocessing as mp
POOL = mp.Pool()


class Unstaggerer(object):
"""Duck NetCDF4.Variable class which "unstaggers" WRF variables.
Expand Down Expand Up @@ -405,7 +414,12 @@ def __getitem__(self, item):
var_classes.remove('AccumulatedVariable')


def interp3d(data, zcoord, levels):
def _interp1d(args):
f = interp1d(args[0], args[1], fill_value=np.NaN, bounds_error=False)
return f(args[2])


def interp3d(data, zcoord, levels, use_multiprocessing=True):
"""Interpolate on the first dimension of a 3d var
Useful for WRF pressure or geopotential levels
Expand All @@ -418,6 +432,8 @@ def interp3d(data, zcoord, levels):
same dims as data, the z coordinates of the data points
levels: 1darray
the levels at which to interpolate
use_multiprocessing: bool
set to false if, for some reason, you don't want to use mp
Returns
-------
Expand All @@ -433,15 +449,25 @@ def interp3d(data, zcoord, levels):
if ndims != 3:
raise ValueError('ndims must be 3')

# TODO: there got to be a faster way to do this
# same problem, no solution: http://stackoverflow.com/questions/27622808/
# fast-3d-interpolation-of-atmospheric-data-in-numpy-scipy
out = np.zeros((len(levels), data.shape[-2], data.shape[-1]))
for i in range(data.shape[-1]):
if use_multiprocessing:
inp = []
for j in range(data.shape[-2]):
f = interp1d(zcoord[:, j, i], data[:, j, i],
fill_value=np.NaN, bounds_error=False)
out[:, j, i] = f(levels)
for i in range(data.shape[-1]):
inp.append((zcoord[:, j, i], data[:, j, i], levels))
_init_pool()
out = POOL.map(_interp1d, inp, chunksize=1000)
out = np.asarray(out).T
out = out.reshape((len(levels), data.shape[-2], data.shape[-1]))
else:
# TODO: there got to be a faster way to do this
# same problem: http://stackoverflow.com/questions/27622808/
# fast-3d-interpolation-of-atmospheric-data-in-numpy-scipy
out = np.zeros((len(levels), data.shape[-2], data.shape[-1]))
for i in range(data.shape[-1]):
for j in range(data.shape[-2]):
f = interp1d(zcoord[:, j, i], data[:, j, i],
fill_value=np.NaN, bounds_error=False)
out[:, j, i] = f(levels)
return out


Expand Down

0 comments on commit a4f1706

Please sign in to comment.