Skip to content

Commit

Permalink
Fix threading issues between rpy2 and dask
Browse files Browse the repository at this point in the history
  • Loading branch information
J.R. Angevaare committed Aug 10, 2023
1 parent 3eb52b8 commit 8d47429
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion optim_esm_tools/analyze/region_finding.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,26 @@ def store_mask(self, mask, m_i, store_masks=True):
for k, v in self.read_ds_kw.items()
if k not in 'max_time min_time'.split()
}
self.log.debug('start start cal')

# This line is very important, there is some non-optimal threading race condition going on
# between dask and rpy2
ds_masked = ds_masked.load() # do not delete!

statistics = TimeStatistics(
ds_masked, calculation_kwargs=dict(max_jump=kw, n_std_global=kw)
).calculate_statistics()
ds_masked.attrs.update(statistics)
self.log.debug(f'done start cal {statistics}')
ds_masked.attrs.update(
{k: (v if v is not None else 'None') for k, v in statistics.items()}
)
ds_masked.to_netcdf(
os.path.join(
store_in_dir,
f'{self.title_label}_cluster-{m_i}_v{_CMIP_HANDLER_VERSION}.nc',
)
)
self.log.debug('done save')

def _summarize_mask(self, mask, plot=None):
axes = oet.plotting.map_maker.summarize_mask(self.data_set, mask, plot=plot)
Expand Down

0 comments on commit 8d47429

Please sign in to comment.