In [9]:
from pathlib import Path

import numpy as np
from xarray import DataArray
import matplotlib.pyplot as plt
import itertools
import pandas as pd
import pytest
import ipytest
ipytest.autoconfig()

import weakref

import numpy as np
import pytest
import xarray as xr

from climate_index_collection.reductions import (
    grouped_mean_weighted,
    monthly_mean_weighted,
    monthly_mean_unweighted,
)
from climate_index_collection.indices import el_nino_southern_oscillation_34

In [2]:
time = pd.to_datetime(["2020-02-13", "2021-06-13", "2021-08-13", "2022-02-13"])
time.month

Int64Index([2, 6, 8, 2], dtype='int64')

In [43]:
# ========
# CREATE TEST DATA 0
# ========

lon = np.array([120,140,150])
lat = np.array([-10, -5, 0])

def create_data_array(values,  group, group_name) :
    """
    This function creates test DataArrays from given lat, lon, group and groupname and weights.
    -----
    Parameters:
        lat: numpy.adarray, list
        lon: numpy.adarray, list
        values: numpy.adarray
        group: numpy.adarray, list
        groupname: str
    """
    # create dummy dataset 
    data = DataArray(values, 
                     dims=(group_name, 
                           "lat", 
                           "lon"), 
                     coords={group_name : group, 
                             "lat": lat, 
                             'lon': lon})
    return data

def create_weight_array(wei, group, group_name) :    
    weights = DataArray(wei, 
                        dims=(group_name), 
                        coords={group_name : group})
    return weights 

def create_mean_array(mean, group_unique, group_name) :    
    weights = DataArray(mean, 
                 dims=(group_name, "lat", "lon"), 
                 coords={group_name : group_unique, "lat": lat, 'lon': lon})
    return weights 
# ----------
# First test DataArray
weights = [1, 2, 3, 2]
group = ['a','b','a','c']
group_name_1 = "group" 
group_unique_1 = np.unique(group)

np.random.seed(100)
values = np.random.randint(0,2, (len(group), len(lat), len(lon)) ).astype(float)
values[0,0,0] = np.nan
data_1 = create_data_array(values = values, 
                            group = group,
                            group_name = group_name_1)
weights_1 = create_weight_array(wei = weights,
                                group = group,
                                group_name = group_name_1)


# Should be the correct values
weighted_mean_1 = np.array(
      [[[0.  , 0.75, 0.25],
        [1.  , 0.25, 0.25],
        [0.  , 0.75, 0.75]],

       [[0.  , 0.  , 1.  ],
        [0.  , 0.  , 0.  ],
        [0.  , 1.  , 0.  ]],

       [[1.  , 0.  , 0.  ],
        [1.  , 0.  , 0.  ],
        [1.  , 1.  , 1.  ]]])

weighted_mean_should_1 = create_mean_array(mean = weighted_mean,
                                        group_unique=group_unique_1,
                                        group_name = group_name_1)
# ----------
# Second test DataArray
time = pd.to_datetime(["2020-02-13", "2021-06-13", "2021-08-13", "2022-02-13"])
group_2 = time.days_in_month
group_unique_2 = np.unique(time.month)
group_name = "month"
# create values Note that we will use the 
#np.random.seed(100)
#values = np.random.randint(0,2, (len(group), len(lat), len(lon)) ).astype(float) * (29+28)
#values[0,0,0] = np.nan
values_2 = np.array(
      [[[np.nan,  0., 57.],
        [57., 57., 57.],
        [ 0.,  0.,  0.]],

       [[ 0.,  0., 57.],
        [ 0.,  0.,  0.],
        [ 0., 57.,  0.]],

       [[ 0., 57.,  0.],
        [57.,  0.,  0.],
        [ 0., 57., 57.]],

       [[57.,  0.,  0.],
        [57.,  0.,  0.],
        [57., 57., 57.]]])
unweighted_mean_2 = np.array(
      [[[57,  0., 28.5,],
        [57, 28.5, 28.5,],
        [28.5, 28.5, 28.5]],

       [[ 0.,  0., 57.],
        [ 0.,  0.,  0.],
        [ 0., 57.,  0.]],

       [[ 0., 57.,  0.],
        [57.,  0.,  0.],
        [ 0., 57., 57.]]])

weighted_mean_2 = np.array(
      [[[57.,  0., 29.],
        [57., 29., 29.],
        [28., 28., 28.]],

       [[ 0.,  0., 57.],
        [ 0.,  0.,  0.],
        [ 0., 57.,  0.]],

       [[ 0., 57.,  0.],
        [57.,  0.,  0.],
        [ 0., 57., 57.]]])
# # calculate the mean values
# # get both february values and replace nan with 0, as xarray does it in the calculation
# feb_2020 = values[0]
# #feb_2020[np.isnan(feb_2020)] = 0
# feb_2020_len = 29
# feb_2022 = values[3]
# #feb_2022[np.isnan(feb_2022)] = 0
# feb_2022_len = 28
# # for the mean calcultion we need to take care of nans.
# # Those will not be acconuted for at all!
# numerator = np.nansum(np.dstack(
#                         (feb_2020 * feb_2020_len , feb_2022 * feb_2022_len))
#                       , 2)
# denominator_2020 = (~np.isnan(feb_2020)).astype(int) * feb_2020_len
# denominator_2022 = (~np.isnan(feb_2022)).astype(int) * feb_2022_len
# denominator = denominator_2020 + denominator_2022
# feb_weighted_mean = numerator / denominator
# weighted_mean = np.array((feb_weighted_mean, values[1], values[2]))

weighted_mean_should_2 = create_mean_array(mean = weighted_mean_2,
                                        group_unique=group_unique_2,
                                        group_name = group_name_2)
unweighted_mean_should_2 = create_mean_array(mean = unweighted_mean_2,
                                        group_unique=group_unique_2,
                                        group_name = group_name_2)

data_2, weights_2, group_name_2, group_unique_2 = create_test_dataset(
                                    lat = lat,
                                    lon = lon,
                                    values = values_2, 
                                    wei = weight_time,
                                    group = time,
                                    group_name = "time")
# aslo create a dataset
dataset_2 = data_2.to_dataset(dim=None, name="test", promote_attrs=False)

ValueError: conflicting sizes for dimension 'time': length 3 on the data but length 4 on coordinate 'time'

In [33]:
@pytest.mark.parametrize("data, weights, dim     , groupby_dim, weighted_mean_should",[ 
    (data_1, weights_1, group_name_1 , group_name_1 , group_mean_should_1),
    (data_2, weights_2, "time" , "time.month" , group_mean_should_2),
                         ])
def test_grouped_mean_weighted(data, weights, dim, groupby_dim, weighted_mean_should):
    """Checks if the groupby weighting function gives proper results."""
    result = grouped_mean_weighted(dobj=data, weights= weights, dim = dim, groupby_dim= groupby_dim)
    assert result.equals(weighted_mean_should)

@pytest.mark.parametrize("data_set, name, weighted_mean_should",[ 
        (dataset_2, "test", weighted_mean_2),
])
def test_monthly_mean_unweighted(data_set, name, weighted_mean_should):
    """Checks if the monthly mean weighted function gives proper results."""
    result = monthly_mean_weighted(dobj=data_set)
    print(result)
    print(weighted_mean_2)
    assert result[name].equals(weighted_mean_should)


In [34]:
ipytest.run()

platform linux -- Python 3.9.12, pytest-7.1.2, pluggy-1.0.0
rootdir: /work, configfile: pyproject.toml
plugins: anyio-3.5.0
collected 3 items

tmpouoiqvwr.py [32m.[0m[32m.[0m[31mF[0m[31m                                                                           [100%][0m

[31m[1m________________ test_monthly_mean_unweighted[data_set0-test-weighted_mean_should0] ________________[0m

data_set = <xarray.Dataset>
Dimensions:  (time: 4, lat: 3, lon: 3)
Coordinates:
  * time     (time) datetime64[ns] 2020-02-13 202... (lon) int64 120 140 150
Data variables:
    test     (time, lat, lon) float64 nan 0.0 57.0 57.0 ... 0.0 57.0 57.0 57.0
name = 'test'
weighted_mean_should = array([[[57.,  0., 29.],
        [57., 29., 29.],
        [28., 28., 28.]],

       [[ 0.,  0., 57.],
        [ 0.,  0.,  0.],
        [ 0., 57.,  0.]],

       [[ 0., 57.,  0.],
        [57.,  0.,  0.],
        [ 0., 57., 57.]]])

    [37m@pytest[39;49;00m.mark.parametrize([33m"[39;49;00m[33mdata_set, name, w

<ExitCode.TESTS_FAILED: 1>

In [38]:

def test_monthly_mean_weighted(data_set, name, weighted_mean_should):
    """Checks if the monthly mean weighted function gives proper results."""
    result = monthly_mean_weighted(dobj=data_set)
    print("result", result["test"])
    print("wM", weighted_mean_2)
    
test_monthly_mean_weighted(dataset_2, "test", weighted_mean_2)

result <xarray.DataArray 'test' (month: 3, lat: 3, lon: 3)>
array([[[57.,  0., 29.],
        [57., 29., 29.],
        [28., 28., 28.]],

       [[ 0.,  0., 57.],
        [ 0.,  0.,  0.],
        [ 0., 57.,  0.]],

       [[ 0., 57.,  0.],
        [57.,  0.,  0.],
        [ 0., 57., 57.]]])
Coordinates:
  * lat      (lat) int64 -10 -5 0
  * lon      (lon) int64 120 140 150
  * month    (month) int64 2 6 8
wM [[[57.  0. 29.]
  [57. 29. 29.]
  [28. 28. 28.]]

 [[ 0.  0. 57.]
  [ 0.  0.  0.]
  [ 0. 57.  0.]]

 [[ 0. 57.  0.]
  [57.  0.  0.]
  [ 0. 57. 57.]]]


In [30]:
monthly_mean_weighted?

[0;31mSignature:[0m [0mmonthly_mean_weighted[0m[0;34m([0m[0mdobj[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Calculates the weighted monthly mean values of a dataset.
It will make use of the grouped_mean_weighted function, which is similar to the mean_weigthed function, 
but additionally allow to include a dimension to group the data by.
It takes care of leap years and thus differs from "monthly_mean_unweighted"
Adapted from: https://docs.xarray.dev/en/stable/examples/monthly-means.html

Parameters
----------
dobj: xarray.Dataset or xarray.DataArray
    Contains the original data.
Returns
-------
xarray.Dataset or xarray.DataArray
    Monthly mean data. Has the same variable name(s) as dobj. 
    Dimension 'time' will be removed.
    Dimension 'month' is gained. 
        Int values, starting with 1 for January and ending with 12 for December.
[0;31mFile:[0m      /work/climate_index_collection/reductions.py
[0;31mType:[0m      function


In [17]:
result.equals(group_mean_should_2)

True