From 2514c06ccd7224bb03bd748ac65734840d501153 Mon Sep 17 00:00:00 2001 From: "Phillip J. Wolfram" Date: Wed, 29 Mar 2017 16:05:05 -0600 Subject: [PATCH 1/2] Fixes dask out of memory error (sets max chunks) --- config.default | 6 +++++ .../generalized_reader/generalized_reader.py | 24 +++++++++++++------ .../shared/mpas_xarray/mpas_xarray.py | 17 ++++++++++--- mpas_analysis/test/test_generalized_reader.py | 4 +++- 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/config.default b/config.default index dff3e0835..7c53f3081 100644 --- a/config.default +++ b/config.default @@ -70,6 +70,12 @@ mpasMeshName = mesh # first be divided among the tasks before applying this fraction. autocloseFileLimitFraction = 0.5 +# Large datasets can encounter a memory error. Specification of a maximum +# chunk size `maxChunkSize` can be helpful to prevent the memory error. The +# current maximum chunk size assumes approximately 64GB of ram and large files +# with a single time slice. +maxChunkSize = 10000 + [output] ## options related to writing out plots, intermediate cached data sets, logs, ## etc. diff --git a/mpas_analysis/shared/generalized_reader/generalized_reader.py b/mpas_analysis/shared/generalized_reader/generalized_reader.py index 2d7d71cbd..78531d4a8 100644 --- a/mpas_analysis/shared/generalized_reader/generalized_reader.py +++ b/mpas_analysis/shared/generalized_reader/generalized_reader.py @@ -109,12 +109,16 @@ def open_multifile_dataset(fileNames, calendar, config, Author ------ - Xylar Asay-Davis + Xylar Asay-Davis, Phillip J. Wolfram Last modified ------------- - 02/23/2017 + 03/29/2017 """ + + # limit chunk size to prevent memory error + maxChunkSize = config.getint('input', 'maxChunkSize') + preprocess_partial = partial(_preprocess, calendar=calendar, simulationStartTime=simulationStartTime, @@ -124,7 +128,8 @@ def open_multifile_dataset(fileNames, calendar, config, iselValues=iselValues, variableMap=variableMap, startDate=startDate, - endDate=endDate) + endDate=endDate, + maxChunkSize=maxChunkSize) kwargs = {'decode_times': False, 'concat_dim': 'Time'} @@ -179,7 +184,7 @@ def open_multifile_dataset(fileNames, calendar, config, def _preprocess(ds, calendar, simulationStartTime, timeVariableName, variableList, selValues, iselValues, variableMap, - startDate, endDate): # {{{ + startDate, endDate, maxChunkSize): # {{{ """ Performs variable remapping, then calls mpas_xarray.preprocess, to perform the remainder of preprocessing. @@ -242,6 +247,10 @@ def _preprocess(ds, calendar, simulationStartTime, timeVariableName, If present, the first and last dates to be used in the data set. The time variable is sliced to only include dates within this range. + maxChunkSize : int + Specifies the maximum chunk size to limit chunks used by dask to + prevent out of memory errors for large datasets. + Returns ------- ds : xarray.DataSet object @@ -250,11 +259,11 @@ def _preprocess(ds, calendar, simulationStartTime, timeVariableName, Authors ------- - Xylar Asay-Davis + Xylar Asay-Davis, Phillip J. Wolfram Last modified ------------- - 02/17/2017 + 03/30/2017 """ submap = variableMap @@ -283,7 +292,8 @@ def _preprocess(ds, calendar, simulationStartTime, timeVariableName, timeVariableName=timeVariableName, variableList=variableList, selValues=selValues, - iselValues=iselValues) + iselValues=iselValues, + maxChunkSize=maxChunkSize) return ds # }}} diff --git a/mpas_analysis/shared/mpas_xarray/mpas_xarray.py b/mpas_analysis/shared/mpas_xarray/mpas_xarray.py index 545be4411..be68316ab 100644 --- a/mpas_analysis/shared/mpas_xarray/mpas_xarray.py +++ b/mpas_analysis/shared/mpas_xarray/mpas_xarray.py @@ -177,7 +177,7 @@ def subset_variables(ds, variableList): # {{{ def preprocess(ds, calendar, simulationStartTime, timeVariableName, - variableList, selValues, iselValues): # {{{ + variableList, selValues, iselValues, maxChunkSize=1000): # {{{ """ Builds correct time specification for MPAS, allowing a date offset because the time must be between 1678 and 2262 based on the xarray @@ -229,6 +229,10 @@ def preprocess(ds, calendar, simulationStartTime, timeVariableName, iselValues = {'nVertLevels': slice(0, 3), 'nCells': cellIDs} + maxChunkSize : int + Specifies the maximum chunk size to limit chunks used by dask to + prevent out of memory errors for large datasets. + Returns ------- ds : xarray.DataSet object @@ -242,7 +246,7 @@ def preprocess(ds, calendar, simulationStartTime, timeVariableName, Last modified ------------- - 02/17/2017 + 03/30/2017 """ ds = _parse_dataset_time(ds=ds, @@ -265,6 +269,13 @@ def preprocess(ds, calendar, simulationStartTime, timeVariableName, if iselValues is not None: ds = ds.isel(**iselValues) + chunks = {} + for name in ds.chunks.keys(): + chunklim = np.asarray(ds.chunks[name]).max() + chunks[name] = np.minimum(maxChunkSize, chunklim) + + ds = ds.chunk(chunks) + return ds # }}} @@ -502,7 +513,7 @@ def _parse_dataset_time(ds, inTimeVariableName, calendar, 'simulationStartTime was not ' 'supplied.'.format(inTimeVariableName)) - if (string_to_datetime(referenceDate) == + if (string_to_datetime(referenceDate) == string_to_datetime(simulationStartTime)): days = timeVar.values else: diff --git a/mpas_analysis/test/test_generalized_reader.py b/mpas_analysis/test/test_generalized_reader.py index 905bf580b..351bb07dd 100644 --- a/mpas_analysis/test/test_generalized_reader.py +++ b/mpas_analysis/test/test_generalized_reader.py @@ -16,11 +16,13 @@ @pytest.mark.usefixtures("loaddatadir") class TestGeneralizedReader(TestCase): - def setup_config(self, autocloseFileLimitFraction=0.5): + def setup_config(self, autocloseFileLimitFraction=0.5, + maxChunkSize=10000): config = MpasAnalysisConfigParser() config.add_section('input') config.set('input', 'autocloseFileLimitFraction', str(autocloseFileLimitFraction)) + config.set('input', 'maxChunkSize', str(maxChunkSize)) return config def test_variableMap(self): From 3b08774faa432f4bc13ff0ba797c80b2f367facf Mon Sep 17 00:00:00 2001 From: "Phillip J. Wolfram" Date: Wed, 29 Mar 2017 15:51:52 -0700 Subject: [PATCH 2/2] Note out of memory possibility in MOC in comment --- mpas_analysis/ocean/meridional_overturning_circulation.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mpas_analysis/ocean/meridional_overturning_circulation.py b/mpas_analysis/ocean/meridional_overturning_circulation.py index 1593aa430..6d4fcf86c 100644 --- a/mpas_analysis/ocean/meridional_overturning_circulation.py +++ b/mpas_analysis/ocean/meridional_overturning_circulation.py @@ -305,7 +305,8 @@ def _compute_moc_climo_postprocess(config, runStreams, variableMap, calendar, # Compute annual climatology annualClimatology = ds.mean('Time') - # Convert to numpy arrays + # Convert to numpy arrays + # (can result in a memory error for large array size) horizontalVel = annualClimatology.avgNormalVelocity.values verticalVel = annualClimatology.avgVertVelocityTop.values velArea = verticalVel * areaCell[:, np.newaxis]