From 412fcd82463ad4c51fc2efe475ab7ddeee7b13b9 Mon Sep 17 00:00:00 2001 From: pjuergens <74722312+pjuergens@users.noreply.github.com> Date: Thu, 10 Jun 2021 17:23:57 +0200 Subject: [PATCH] Add an option to skip existing intermediate variables when aggregating recursivly (#532) Co-authored-by: Daniel Huppmann --- RELEASE_NOTES.md | 1 + pyam/_aggregate.py | 27 +++++++++++++---- pyam/_compare.py | 22 ++++++++++++++ pyam/core.py | 33 ++++++++++----------- tests/conftest.py | 30 +++++++++++++++++++ tests/test_feature_aggregate.py | 51 +++++++++++++++++---------------- 6 files changed, 118 insertions(+), 46 deletions(-) create mode 100644 pyam/_compare.py diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 0966bb1fb..f7367a690 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -3,6 +3,7 @@ - [#541](https://github.com/IAMconsortium/pyam/pull/541) Support units in binary operations - [#538](https://github.com/IAMconsortium/pyam/pull/538) Add option to set defaults in binary operations - [#537](https://github.com/IAMconsortium/pyam/pull/537) Enhance binary ops to support numerical arguments +- [#532](https://github.com/IAMconsortium/pyam/pull/532) Add an option to skip existing intermediate variables when aggregating recursivly - [#533](https://github.com/IAMconsortium/pyam/pull/533) Add an `apply()` function for custom mathematical operations - [#527](https://github.com/IAMconsortium/pyam/pull/527) Add an in-dataframe basic mathematical operations `subtract`, `add`, `multiply`, `divide` - [#523](https://github.com/IAMconsortium/pyam/pull/523) Add feature to compute weights for de-biasing using count diff --git a/pyam/_aggregate.py b/pyam/_aggregate.py index cc78eb4d3..7b63ba06e 100644 --- a/pyam/_aggregate.py +++ b/pyam/_aggregate.py @@ -13,6 +13,8 @@ KNOWN_FUNCS, to_list, ) +from pyam._compare import _compare + logger = logging.getLogger(__name__) @@ -57,11 +59,12 @@ def _aggregate(df, variable, components=None, method=np.sum): return _group_and_agg(_df, [], method) -def _aggregate_recursive(df, variable): +def _aggregate_recursive(df, variable, recursive): """Recursive aggregation along the variable tree""" # downselect to components of `variable`, initialize list for aggregated (new) data - _df = df.filter(variable=f"{variable}|*") + # keep variable at highest level if it exists + _df = df.filter(variable=[variable, f"{variable}|*"]) data_list = [] # iterate over variables (bottom-up) and aggregate all components up to `variable` @@ -70,9 +73,23 @@ def _aggregate_recursive(df, variable): var_list = set([reduce_hierarchy(v, -1) for v in components]) # a temporary dataframe allows to distinguish between full data and new data - temp_df = _df.aggregate(variable=var_list) - _df.append(temp_df, inplace=True) - data_list.append(temp_df._data) + _data_agg = _aggregate(_df, variable=var_list) + + # check if data for intermediate variables already exists + _data_self = _df.filter(variable=var_list)._data + _overlap = _data_agg.index.intersection(_data_self.index) + _new = _data_agg.index.difference(_data_self.index) + + # assert that aggregated values are consistent with existing data (optional) + if recursive != "skip-validate" and not _overlap.empty: + conflict = _compare(_data_self, _data_agg[_overlap], "self", "aggregate") + if not conflict.empty: + msg = "Aggregated values are inconsistent with existing data:" + raise ValueError(f"{msg}\n{conflict}") + + # append aggregated values that are not already in data + _df.append(_data_agg[_new], inplace=True) + data_list.append(_data_agg[_new]) return pd.concat(data_list) diff --git a/pyam/_compare.py b/pyam/_compare.py new file mode 100644 index 000000000..37cc331c5 --- /dev/null +++ b/pyam/_compare.py @@ -0,0 +1,22 @@ +import numpy as np +import pandas as pd + + +def _compare( + left, right, left_label="left", right_label="right", drop_close=True, **kwargs +): + """Internal implementation of comparison of IamDataFrames or pd.Series""" + + def as_series(s): + return s if isinstance(s, pd.Series) else s._data + + ret = pd.merge( + left=as_series(left).rename(index=left_label), + right=as_series(right).rename(index=right_label), + how="outer", + left_index=True, + right_index=True, + ) + if drop_close: + ret = ret[~np.isclose(ret[left_label], ret[right_label], **kwargs)] + return ret diff --git a/pyam/core.py b/pyam/core.py index e1743e4ac..7cf8ec3a5 100755 --- a/pyam/core.py +++ b/pyam/core.py @@ -58,6 +58,7 @@ ) from pyam.read_ixmp import read_ix from pyam.plotting import PlotAccessor, mpl_args_to_meta_cols +from pyam._compare import _compare from pyam._aggregate import ( _aggregate, _aggregate_region, @@ -686,7 +687,7 @@ def swap_time_for_year(self, inplace=False): # assign data and other attributes ret._LONG_IDX = _index - ret._data = _data.set_index(ret._LONG_IDX) + ret._data = _data.set_index(ret._LONG_IDX).value ret.time_col = "year" ret._set_attributes() delattr(ret, "time") @@ -1211,7 +1212,12 @@ def normalize(self, inplace=False, **kwargs): return ret def aggregate( - self, variable, components=None, method="sum", recursive=False, append=False + self, + variable, + components=None, + method="sum", + recursive=False, + append=False, ): """Aggregate timeseries by components or subcategories within each region @@ -1223,8 +1229,11 @@ def aggregate( Components to be aggregate, defaults to all subcategories of `variable`. method : func or str, optional Aggregation method, e.g. :func:`numpy.mean`, :func:`numpy.sum`, 'min', 'max' - recursive : bool, optional + recursive : bool or str, optional Iterate recursively (bottom-up) over all subcategories of `variable`. + If there are existing intermediate variables, it validates the aggregated + value. + If recursive='skip-validate', it skips the validation. append : bool, optional Whether to append aggregated timeseries data to this instance. @@ -1244,7 +1253,7 @@ def aggregate( for individual components as 0. """ - if recursive is True: + if recursive: if components is not None: raise ValueError("Recursive aggregation cannot take `components`!") if method != "sum": @@ -1252,7 +1261,9 @@ def aggregate( "Recursive aggregation only supported with `method='sum'`!" ) - _df = IamDataFrame(_aggregate_recursive(self, variable), meta=self.meta) + _df = IamDataFrame( + _aggregate_recursive(self, variable, recursive), meta=self.meta + ) else: _df = _aggregate(self, variable, components=components, method=method) @@ -2643,17 +2654,7 @@ def compare( kwargs : arguments for comparison of values passed to :func:`numpy.isclose` """ - ret = pd.concat( - { - left_label: left.data.set_index(left._LONG_IDX), - right_label: right.data.set_index(right._LONG_IDX), - }, - axis=1, - ) - ret.columns = ret.columns.droplevel(1) - if drop_close: - ret = ret[~np.isclose(ret[left_label], ret[right_label], **kwargs)] - return ret + return _compare(left, right, left_label, right_label, drop_close=True, **kwargs) def concat(dfs, ignore_meta_conflict=False, **kwargs): diff --git a/tests/conftest.py b/tests/conftest.py index 9acf50009..cca7e0f6a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -105,6 +105,18 @@ ) +RECURSIVE_DF = pd.DataFrame( + [ + ["Secondary Energy|Electricity", "EJ/yr", 5, 19.0], + ["Secondary Energy|Electricity|Wind", "EJ/yr", 5, 17], + ["Secondary Energy|Electricity|Wind|Offshore", "EJ/yr", 1, 5], + ["Secondary Energy|Electricity|Wind|Onshore", "EJ/yr", 4, 12], + ["Secondary Energy|Electricity|Solar", "EJ/yr", np.nan, 2], + ], + columns=["variable", "unit"] + TEST_YEARS, +) + + TEST_STACKPLOT_DF = pd.DataFrame( [ ["World", "Emissions|CO2|Energy|Oil", "Mt CO2/yr", 2, 3.2, 2.0, 1.8], @@ -210,6 +222,24 @@ def plot_df(): yield df +# IamDataFrame with two scenarios and structure for recursive aggregation +@pytest.fixture(scope="function", params=["year", "datetime"]) +def recursive_df(request): + + data = ( + RECURSIVE_DF + if request.param == "year" + else RECURSIVE_DF.rename(DTS_MAPPING, axis="columns") + ) + + df = IamDataFrame(data, model="model_a", scenario="scen_a", region="World") + df2 = df.rename(scenario={"scen_a": "scen_b"}) + df2._data *= 2 + df.append(df2, inplace=True) + + yield df + + @pytest.fixture(scope="session") def plot_stackplot_df(): df = IamDataFrame(TEST_STACKPLOT_DF) diff --git a/tests/test_feature_aggregate.py b/tests/test_feature_aggregate.py index bb4ff98a3..e3bbe17c3 100644 --- a/tests/test_feature_aggregate.py +++ b/tests/test_feature_aggregate.py @@ -42,17 +42,6 @@ columns=LONG_IDX + ["value"], ) -RECURSIVE_DF = pd.DataFrame( - [ - ["Secondary Energy|Electricity", "EJ/yr", 5, 19.0], - ["Secondary Energy|Electricity|Wind", "EJ/yr", 5, 17], - ["Secondary Energy|Electricity|Wind|Offshore", "EJ/yr", 1, 5], - ["Secondary Energy|Electricity|Wind|Onshore", "EJ/yr", 4, 12], - ["Secondary Energy|Electricity|Solar", "EJ/yr", np.nan, 2], - ], - columns=["variable", "unit"] + TEST_YEARS, -) - @pytest.mark.parametrize( "variable,data", @@ -133,31 +122,43 @@ def test_aggregate_by_list_with_components_raises(simple_df): pytest.raises(ValueError, simple_df.aggregate, v, components=components) -@pytest.mark.parametrize("time_col", (("year"), ("time"))) -def test_aggregate_recursive(time_col): +def test_aggregate_recursive(recursive_df): # use the feature `recursive=True` - data = ( - RECURSIVE_DF - if time_col == "year" - else RECURSIVE_DF.rename(DTS_MAPPING, axis="columns") - ) - df = IamDataFrame(data, model="model_a", scenario="scen_a", region="World") - df2 = df.rename(scenario={"scen_a": "scen_b"}) - df2.data.value *= 2 - df.append(df2, inplace=True) # create object without variables to be aggregated v = "Secondary Energy|Electricity" agg_vars = [f"{v}{i}" for i in ["", "|Wind"]] - df_minimal = df.filter(variable=agg_vars, keep=False) + df_minimal = recursive_df.filter(variable=agg_vars, keep=False) # return recursively aggregated data as new object obs = df_minimal.aggregate(variable=v, recursive=True) - assert_iamframe_equal(obs, df.filter(variable=agg_vars)) + assert_iamframe_equal(obs, recursive_df.filter(variable=agg_vars)) # append to `self` df_minimal.aggregate(variable=v, recursive=True, append=True) - assert_iamframe_equal(df_minimal, df) + assert_iamframe_equal(df_minimal, recursive_df) + + +def test_aggregate_skip_intermediate(recursive_df): + # make the data inconsistent, check (and then skip) validation + + recursive_df._data.iloc[0] = recursive_df._data.iloc[0] + 2 + recursive_df._data.iloc[3] = recursive_df._data.iloc[3] + 2 + + # create object without variables to be aggregated, but with intermediate variables + v = "Secondary Energy|Electricity" + df_minimal = recursive_df.filter(variable=v, scenario="scen_a", keep=False) + agg_vars = [f"{v}{i}" for i in ["", "|Wind"]] + df_minimal.filter(variable=agg_vars, scenario="scen_b", keep=False, inplace=True) + + # simply calling recursive aggregation raises an error + match = "Aggregated values are inconsistent with existing data:" + with pytest.raises(ValueError, match=match): + df_minimal.aggregate(variable=v, recursive=True, append=True) + + # append to `self` with skipping validation + df_minimal.aggregate(variable=v, recursive="skip-validate", append=True) + assert_iamframe_equal(df_minimal, recursive_df) @pytest.mark.parametrize(