Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config.default
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ autocloseFileLimitFraction = 0.5
# with a single time slice.
maxChunkSize = 10000

# Reasonable size for an array to be cached into memory
reasonableArraySizeGB = 3.0

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice addition. I assume your plan was to use this in places other than MOC down the line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. We would want to use something like load anywhere where a .values cache would be desirable.

[output]
## options related to writing out plots, intermediate cached data sets, logs,
## etc.
Expand Down
165 changes: 72 additions & 93 deletions mpas_analysis/ocean/meridional_overturning_circulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@

Last Modified
-------------
03/23/2017
03/28/2017
"""

from functools import partial
import xarray as xr
import numpy as np
import netCDF4
Expand All @@ -28,7 +29,7 @@
from ..shared.io.utility import build_config_full_path

from ..shared.generalized_reader.generalized_reader \
import open_multifile_dataset
import open_multifile_dataset, preload

from ..shared.timekeeping.utility import get_simulation_start_time, \
days_to_datetime
Expand Down Expand Up @@ -203,23 +204,18 @@ def _load_mesh(runStreams): # {{{
except ValueError:
raise IOError('No MPAS-O restart file found: need at least one '
'restart file for MOC calculation')
ncFile = netCDF4.Dataset(restartFile, mode='r')
dvEdge = ncFile.variables['dvEdge'][:]
areaCell = ncFile.variables['areaCell'][:]
refBottomDepth = ncFile.variables['refBottomDepth'][:]
latCell = ncFile.variables['latCell'][:]
ds = xr.open_dataset(restartFile)
latCell = ds.latCell
latCell = latCell * rad_to_deg # convert to degree
ncFile.close()
nVertLevels = len(refBottomDepth)
refTopDepth = np.zeros(nVertLevels+1)
refTopDepth[1:nVertLevels+1] = refBottomDepth[0:nVertLevels]
refLayerThickness = np.zeros(nVertLevels)
refLayerThickness[0] = refBottomDepth[0]
refLayerThickness[1:nVertLevels] = (refBottomDepth[1:nVertLevels] -
refBottomDepth[0:nVertLevels-1])
refTopDepth = xr.DataArray(np.hstack((0, ds.refBottomDepth.values)),
coords=[ds.nVertLevelsP1])
refLayerThickness = xr.DataArray(np.hstack((ds.refBottomDepth[0].values,
ds.refBottomDepth.diff('nVertLevels').values)),
coords=[ds.nVertLevels])

return dvEdge, areaCell, refBottomDepth, latCell, nVertLevels, \
refTopDepth, refLayerThickness # }}}
return ds.dvEdge, ds.areaCell, ds.refBottomDepth, latCell, \
ds. nVertLevels, ds.nVertLevelsP1, \
refTopDepth, refLayerThickness # }}}


def _compute_moc_climo_postprocess(config, runStreams, variableMap, calendar,
Expand All @@ -228,7 +224,7 @@ def _compute_moc_climo_postprocess(config, runStreams, variableMap, calendar,
'''compute mean MOC streamfunction as a post-process'''

dvEdge, areaCell, refBottomDepth, latCell, nVertLevels, \
refTopDepth, refLayerThickness = _load_mesh(runStreams)
nVertLevelsP1, refTopDepth, refLayerThickness = _load_mesh(runStreams)

variableList = ['avgNormalVelocity',
'avgVertVelocityTop']
Expand All @@ -240,30 +236,22 @@ def _compute_moc_climo_postprocess(config, runStreams, variableMap, calendar,
raise IOError('Regional masking file for MOC calculation '
'does not exist')
iRegion = 0
dictRegion = {}
for region in regionNames:
print '\n Reading region and transect mask for {}...'.format(region)
ncFileRegional = netCDF4.Dataset(regionMaskFiles, mode='r')
maxEdgesInTransect = \
ncFileRegional.dimensions['maxEdgesInTransect'].size
dsRegion = xr.open_dataset(regionMaskFiles)
transectEdgeMaskSigns = \
ncFileRegional.variables['transectEdgeMaskSigns'][:, iRegion]
transectEdgeGlobalIDs = \
ncFileRegional.variables['transectEdgeGlobalIDs'][iRegion, :]
regionCellMask = \
ncFileRegional.variables['regionCellMasks'][:, iRegion]
ncFileRegional.close()
dsRegion.transectEdgeMaskSigns.isel(nTransects=iRegion)
regionCellMask = dsRegion.regionCellMasks.isel(nRegions=iRegion)
iRegion += 1

indRegion = np.where(regionCellMask == 1)
dictRegion = {
'ind{}'.format(region): indRegion,
# this will only have the last entry in regionNames
dictRegion.update({
'{}CellMask'.format(region): regionCellMask,
'maxEdgesInTransect{}'.format(region): maxEdgesInTransect,
'transectEdgeMaskSigns{}'.format(region): transectEdgeMaskSigns,
'transectEdgeGlobalIDs{}'.format(region): transectEdgeGlobalIDs}
'transectEdgeMaskSigns{}'.format(region): transectEdgeMaskSigns})
# Add Global regionCellMask=1 everywhere to make the algorithm
# for the global moc similar to that of the regional moc
dictRegion['GlobalCellMask'] = np.ones(np.size(latCell))
dictRegion['GlobalCellMask'] = xr.ones_like(regionCellMask).rename('GlobalCellMask')
regionNames[:0] = ['Global']

# Compute and plot annual climatology of MOC streamfunction
Expand Down Expand Up @@ -306,11 +294,11 @@ def _compute_moc_climo_postprocess(config, runStreams, variableMap, calendar,
# Compute annual climatology
annualClimatology = ds.mean('Time')

# Convert to numpy arrays
# (can result in a memory error for large array size)
horizontalVel = annualClimatology.avgNormalVelocity.values
verticalVel = annualClimatology.avgVertVelocityTop.values
velArea = verticalVel * areaCell[:, np.newaxis]
# Cache results if their size is reasonable
load = partial(preload, maxgb=config.getfloat('input',
'reasonableArraySizeGB'))
horizontalVel = load(annualClimatology.avgNormalVelocity)
velArea = load(annualClimatology.avgVertVelocityTop * areaCell)

# Create dictionary for MOC climatology (NB: need this form
# in order to convert it to xarray dataset later in the script)
Expand All @@ -319,33 +307,28 @@ def _compute_moc_climo_postprocess(config, runStreams, variableMap, calendar,
print ' Compute {} MOC...'.format(region)
print ' Compute transport through region southern transect...'
if region == 'Global':
transportZ = np.zeros(nVertLevels)
transportZ = xr.zeros_like(nVertLevels).rename('transportZ')
else:
maxEdgesInTransect = \
dictRegion['maxEdgesInTransect{}'.format(region)]
transectEdgeGlobalIDs = \
dictRegion['transectEdgeGlobalIDs{}'.format(region)]
transectEdgeMaskSigns = \
dictRegion['transectEdgeMaskSigns{}'.format(region)]
transportZ = _compute_transport(maxEdgesInTransect,
transectEdgeGlobalIDs,
transectEdgeMaskSigns,
transportZ = _compute_transport(transectEdgeMaskSigns,
nVertLevels, dvEdge,
refLayerThickness,
horizontalVel)
horizontalVel, load)

regionCellMask = dictRegion['{}CellMask'.format(region)]
latBinSize = config.getExpression(sectionName,
'latBinSize{}'.format(region))
if region == 'Global':
latBins = np.arange(-90.0, 90.1, latBinSize)
else:
indRegion = dictRegion['ind{}'.format(region)]
latBins = latCell[indRegion]
cellMask = dictRegion['{}CellMask'.format(region)]
latBins = latCell.where(cellMask)
latBins = np.arange(np.amin(latBins),
np.amax(latBins)+latBinSize,
latBinSize)
mocTop = _compute_moc(latBins, nVertLevels, latCell,
latBins = xr.DataArray(latBins, coords=[latBins], dims=['latBins'])
mocTop = _compute_moc(latBins, nVertLevelsP1, latCell,
regionCellMask, transportZ, velArea)

# Store computed MOC to dictionary
Expand All @@ -363,7 +346,7 @@ def _compute_moc_climo_postprocess(config, runStreams, variableMap, calendar,
latBins = mocDictClimo['lat{}'.format(region)]['data']
mocTop = mocDictClimo['moc{}'.format(region)]['data']
ncFile.createDimension('nx{}'.format(region), len(latBins))
# create variables
# create variables
x = ncFile.createVariable('lat{}'.format(region), 'f4',
('nx{}'.format(region),))
x.description = 'latitude bins for MOC {}'\
Expand All @@ -374,7 +357,7 @@ def _compute_moc_climo_postprocess(config, runStreams, variableMap, calendar,
y.description = 'MOC {} streamfunction, annual'\
' climatology'.format(region)
y.units = 'Sv (10^6 m^3/s)'
# save variables
# save variables
x[:] = latBins
y[:, :] = mocTop
depth = ncFile.createVariable('depth', 'f4', ('nz',))
Expand Down Expand Up @@ -412,7 +395,8 @@ def write_file(dsMOCTimeSeries):
dsMOCTimeSeries.Time.attrs['units'] = 'days since 0001-01-01'
dsMOCTimeSeries.mocAtlantic26.attrs['units'] = 'Sv (10^6 m^3/s)'
dsMOCTimeSeries.mocAtlantic26.attrs['description'] = \
'Max MOC Atlantic streamfunction nearest to RAPID Array latitude (26.5N)'
('Max MOC Atlantic streamfunction nearest to '
'RAPID Array latitude (26.5N)')
dsMOCTimeSeries.to_netcdf(outputFileTseries)
return dsMOCTimeSeries

Expand All @@ -426,7 +410,7 @@ def write_file(dsMOCTimeSeries):
'avgVertVelocityTop']

dvEdge, areaCell, refBottomDepth, latCell, nVertLevels, \
refTopDepth, refLayerThickness = _load_mesh(runStreams)
nVertLevelsP1, refTopDepth, refLayerThickness = _load_mesh(runStreams)

ds = open_multifile_dataset(fileNames=dictTseries['inputFilesTseries'],
calendar=calendar,
Expand All @@ -438,11 +422,7 @@ def write_file(dsMOCTimeSeries):
startDate=dictTseries['startDateTseries'],
endDate=dictTseries['endDateTseries'])
latAtlantic = mocDictClimo['latAtlantic']['data']
dLat = latAtlantic - 26.5
indlat26 = np.where(dLat == np.amin(np.abs(dLat)))

maxEdgesInTransect = dictRegion['maxEdgesInTransectAtlantic']
transectEdgeGlobalIDs = dictRegion['transectEdgeGlobalIDsAtlantic']
transectEdgeMaskSigns = dictRegion['transectEdgeMaskSignsAtlantic']
regionCellMask = dictRegion['AtlanticCellMask']

Expand Down Expand Up @@ -484,18 +464,18 @@ def write_file(dsMOCTimeSeries):
first = False
date = days_to_datetime(ds.Time[tIndex], calendar=calendar)
print ' date: {:04d}-{:02d}'.format(date.year, date.month)
horizontalVel = ds.avgNormalVelocity[tIndex, :, :].values
verticalVel = ds.avgVertVelocityTop[tIndex, :, :].values
velArea = verticalVel * areaCell[:, np.newaxis]
transportZ = _compute_transport(maxEdgesInTransect,
transectEdgeGlobalIDs,
transectEdgeMaskSigns,
# Cache results if their size is reasonable
load = partial(preload, maxgb=config.getfloat('input',
'reasonableArraySizeGB'))
horizontalVel = load(ds.avgNormalVelocity.isel(Time=tIndex))
velArea = load(ds.avgVertVelocityTop.isel(Time=tIndex) * areaCell)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it actually be clearer to read if you just cached the value of reasonableArraySizeGB and had 2 calls to preload without the need to use partial? If we want future developers to use preload, I think we want it to be clear how to use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I did it this way is to parallel the xarray load method. Also, it seems like reasonableArraySizeGB isn't really something we need to change per call to preload. But, partial is confusing for first-time users. I also thought about making reasonableArraySizeGB a global but decided against it for obvious reasons.

Do you think this needs changed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of a style choice but I'd prefer having it be as straightforward as possible for someone who might use this code as a starting point to figure out how to do something similar in another analysis.

transportZ = _compute_transport(transectEdgeMaskSigns,
nVertLevels, dvEdge,
refLayerThickness,
horizontalVel)
mocTop = _compute_moc(latAtlantic, nVertLevels, latCell,
horizontalVel, load)
mocTop = _compute_moc(latAtlantic, nVertLevelsP1, latCell,
regionCellMask, transportZ, velArea)
mocAtlantic26 = np.amax(mocTop[:, indlat26])
mocAtlantic26 = mocTop.sel(latBins=26.5, method='nearest').max()

dictonary = {'Time': {'dims': ('Time'), 'data': [ds.Time[tIndex]]},
'mocAtlantic26': {'dims': ('Time'),
Expand Down Expand Up @@ -523,41 +503,40 @@ def write_file(dsMOCTimeSeries):
# return (mocDictClimo, mocDictTseries)


def _compute_transport(maxEdgesInTransect, transectEdgeGlobalIDs,
transectEdgeMaskSigns, nz, dvEdge, refLayerThickness,
horizontalVel): # {{{
def _compute_transport(transectEdgeMaskSigns, nz, dvEdge, refLayerThickness,
horizontalVel, load): # {{{

'''compute mass transport across southern transect of ocean basin'''

transportZEdge = np.zeros([nz, maxEdgesInTransect])
for i in range(maxEdgesInTransect):
if transectEdgeGlobalIDs[i] == 0:
break
# subtract 1 because of python 0-indexing
iEdge = transectEdgeGlobalIDs[i] - 1
transportZEdge[:, i] = horizontalVel[iEdge, :] * \
transectEdgeMaskSigns[iEdge, np.newaxis] * \
dvEdge[iEdge, np.newaxis] * \
refLayerThickness[np.newaxis, :]
transportZ = transportZEdge.sum(axis=1)
return transportZ # }}}
transect = load(transectEdgeMaskSigns != 0)
transportZ = (refLayerThickness*(
horizontalVel.where(transect, drop=True)*
transectEdgeMaskSigns.where(transect, drop=True)*
dvEdge.where(transect, drop=True)).sum('nEdges'))

return load(transportZ) # }}}


def _compute_moc(latBins, nz, latCell, regionCellMask, transportZ,
velArea): # {{{

'''compute meridionally integrated MOC streamfunction'''

mocTop = np.zeros([np.size(latBins), nz+1])
mocTop[0, range(1, nz+1)] = transportZ.cumsum()
for iLat in range(1, np.size(latBins)):
indlat = np.logical_and(np.logical_and(
regionCellMask == 1, latCell >= latBins[iLat-1]),
latCell < latBins[iLat])
mocTop[iLat, :] = mocTop[iLat-1, :] + velArea[indlat, :].sum(axis=0)
# note computation should be more cleanly facilitated
# via a multidimensional groupby_bins when available
# (see https://github.com/pydata/xarray/pull/924)

mask = regionCellMask.values > 0
velArea = velArea.values[mask, :]
latCell = latCell.values[mask]
mocTop = np.zeros([len(nz), len(latBins)])
mocTop[1:, 0] = transportZ.values.cumsum(axis=0)
for iLat in np.arange(1, len(latBins)):
mask = np.logical_and(latCell >= latBins[iLat-1].values,
latCell < latBins[iLat].values)
mocTop[:, iLat] = mocTop[:, iLat-1] + \
velArea[mask, :].sum(axis=0)
# convert m^3/s to Sverdrup
mocTop = mocTop * m3ps_to_Sv
mocTop = mocTop.T
mocTop = xr.DataArray(mocTop*m3ps_to_Sv, coords=[nz, latBins])
return mocTop # }}}

# vim: foldmethod=marker ai ts=4 sts=4 et sw=4 ft=python
57 changes: 57 additions & 0 deletions mpas_analysis/shared/generalized_reader/generalized_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,35 @@ def open_multifile_dataset(fileNames, calendar, config,
return ds # }}}


def preload(xarrayobj, maxgb): # {{{
"""
Loads the xarray object into memory if it is of an acceptable size.

Parameters
----------
xarrayobj : xarray.DataSet or xarray.DataArray object
xarray object to load into memory if it is of a reasonable size.

maxgb : float
Maximum size of array that can be loaded into memory.

Returns
-------
xarrayobj : xarray.DataSet or xarray.DataArray object

Authors
-------
Phillip J. Wolfram

Last modified
-------------
03/30/2017
"""
if _size_gb(xarrayobj) < maxgb:
xarrayobj.load()
return xarrayobj # }}}


def _preprocess(ds, calendar, simulationStartTime, timeVariableName,
variableList, selValues, iselValues, variableMap,
startDate, endDate, maxChunkSize): # {{{
Expand Down Expand Up @@ -298,6 +327,34 @@ def _preprocess(ds, calendar, simulationStartTime, timeVariableName,
return ds # }}}


def _size_gb(xarrayobj): #{{{
"""
Computes the size of an xarray object in GB.

Parameters
----------
xarrayobj : xarray.DataSet or xarray.DataArray object
xarray object to compute size in GB.

Returns
-------
gbs : float
size of xarrayobj in GB.

Authors
-------
Phillip J. Wolfram

Last modified
-------------
03/30/2017
"""

gbs = xarrayobj.nbytes * 2 ** -30

return gbs #}}}


def _map_variable_name(variableName, ds, variableMap): # {{{
"""
Given a `variableName` in a `variableMap` and an xarray `ds`,
Expand Down