Skip to content

Commit

Permalink
add a merge method to deal with offset time series
Browse files Browse the repository at this point in the history
  • Loading branch information
J.R. Angevaare committed Sep 8, 2023
1 parent eaf9449 commit 456d3bf
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 1 deletion.
22 changes: 21 additions & 1 deletion optim_esm_tools/analyze/combine_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def _squash_variables(self) -> ty.Mapping:
return new_ds

@staticmethod
def _merge_squash(new_ds) -> xr.Dataset:
def _merge_squash(new_ds: dict) -> xr.Dataset:
try:
new_ds = xr.Dataset(**new_ds)
except TypeError as e: # pragma: no cover
Expand All @@ -104,6 +104,26 @@ def _merge_squash(new_ds) -> xr.Dataset:
new_ds[k].attrs = v.attrs
else:
new_ds[k] = v
dt = new_ds['time'].values[-1].year - new_ds['time'].values[0].year
if len(new_ds['time']) > dt + 1: # allow off by one
new_ds = self._fix_wong_merge(new_ds)
return new_ds

@staticmethod
def _fix_wong_merge(ds):
"""The function `_fix_wong_merge` fixes a wrong merge in a dataset by
removing duplicate time values and adjusting the data accordingly.
:param ds: The parameter `ds` is a dataset object. It is assumed to have a dimension called 'time'
and contains multiple variables
:return: a new dataset, `new_ds`, which is a modified version of the input dataset `ds`.
"""
new_ds = ds.copy()
new_ds = new_ds.isel(time=slice(None, len(new_ds['time']) // 2))
new_ds['time'] = ds['time'][::2]
for v, a in ds.data_vars.items():
if 'time' in a.dims:
new_ds[v].data = a[1::2] if np.isnan(a.values[10]) else a[::2]
return new_ds

def make_fig(self, ds=None, fig_kw=None, add_histograms=False):
Expand Down
112 changes: 112 additions & 0 deletions test/test_combine_variables.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import os
import tempfile
from unittest import main
from unittest import TestCase

import cftime
import numpy as np
import pytest
import xarray as xr

import optim_esm_tools as oet
from optim_esm_tools.analyze.combine_variables import VariableMerger


class TestCombineVariables(TestCase):
Expand Down Expand Up @@ -42,3 +49,108 @@ def test_merge_three(self):

def test_merge_w_hist(self):
self.test_merge_two(add_histograms=True)


from hypothesis import given
from hypothesis import strategies as st

import xarray as xr
import numpy as np
import cftime


class TestVariableMerger(TestCase):
"""This unittest was written using the help of CHATGPT, although it
required a fair amount of optimization."""

def create_dummy_dataset(self, length, nx=5, ny=20):
time_values = [cftime.DatetimeNoLeap(2000, 1, i + 1) for i in range(length)]
lat_values = np.linspace(-90, 90, ny)
lon_values = np.linspace(0, 360, nx)
variable1 = (
('time', 'lat', 'lon'),
np.arange(length * ny * nx).reshape(length, ny, nx),
)
variable2 = (
('time', 'lat', 'lon'),
np.random.rand(length, ny, nx),
)
variable3 = (
('time', 'lat', 'lon'),
np.random.randint(0, 2, size=(length, ny, nx)),
)

# Create global mask as a boolean array
global_mask = (('lat', 'lon'), np.random.choice([True, False], size=(ny, nx)))
cell_area = (('lat', 'lon'), np.arange(ny * nx).reshape(ny, nx))

# Create variables with offset time values
offset_time1 = [cftime.DatetimeNoLeap(2000, 1, i + 5) for i in range(length)]
off_dims, offset_variable1 = (
('time', 'lat', 'lon'),
np.random.rand(length, ny, nx),
)
da_off = xr.DataArray(
data=offset_variable1,
dims=off_dims,
coords={
'time': offset_time1,
'lat': lat_values,
'lon': lon_values,
},
)
dummy_data = {
'variable1': variable1,
'variable2': variable2,
'variable3': variable3,
'global_mask': global_mask,
'cell_area': cell_area,
}

coords = {
'time': time_values,
'lat': lat_values,
'lon': lon_values,
}

dataset = xr.Dataset(data_vars=dummy_data, coords=coords)

dataset.attrs['variables'] = list(
set(dummy_data) - {'global_mask', 'cell_area'} | {'offset_variable1'},
)

# Add a running mean with 10 samples to each variable while considering the new dimensions
for var_name in dummy_data:
if var_name in ['cell_area', 'global_mask']:
continue
rm = np.zeros_like(dataset[var_name].values, dtype=np.float16)
rm[:] = np.nan
for lat_idx in range(ny):
for lon_idx in range(nx):
running_mean = np.convolve(
dataset[var_name][:, lat_idx, lon_idx],
np.ones(10) / 10,
mode='valid',
)
rm[5:-4, lat_idx, lon_idx] = running_mean
dataset[var_name + '_run_mean_10'] = (('time', 'lat', 'lon'), rm)
dataset['offset_variable1'] = da_off
dataset['offset_variable1__run_mean_10'] = da_off

return dataset

@given(
dummy_dataset_length=st.integers(min_value=11, max_value=20),
random_seed=st.integers(min_value=1, max_value=1000),
)
def test_combine_masks(self, dummy_dataset_length, random_seed):
np.random.seed(random_seed)
dummy_dataset = self.create_dummy_dataset(dummy_dataset_length)
merger = VariableMerger(data_set=dummy_dataset)
assert merger.data_set.equals(dummy_dataset)
assert merger.mask_paths is None
assert merger.merge_method == 'logical_or'


if __name__ == '__main__':
main()

0 comments on commit 456d3bf

Please sign in to comment.