Skip to content

Commit

Permalink
Add an option to skip existing intermediate variables when aggregatin…
Browse files Browse the repository at this point in the history
…g recursivly (#532)

Co-authored-by: Daniel Huppmann <dh@dergelbesalon.at>
  • Loading branch information
pjuergens and danielhuppmann committed Jun 10, 2021
1 parent 8d8aa6b commit 412fcd8
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 46 deletions.
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
- [#541](https://github.com/IAMconsortium/pyam/pull/541) Support units in binary operations
- [#538](https://github.com/IAMconsortium/pyam/pull/538) Add option to set defaults in binary operations
- [#537](https://github.com/IAMconsortium/pyam/pull/537) Enhance binary ops to support numerical arguments
- [#532](https://github.com/IAMconsortium/pyam/pull/532) Add an option to skip existing intermediate variables when aggregating recursivly
- [#533](https://github.com/IAMconsortium/pyam/pull/533) Add an `apply()` function for custom mathematical operations
- [#527](https://github.com/IAMconsortium/pyam/pull/527) Add an in-dataframe basic mathematical operations `subtract`, `add`, `multiply`, `divide`
- [#523](https://github.com/IAMconsortium/pyam/pull/523) Add feature to compute weights for de-biasing using count
Expand Down
27 changes: 22 additions & 5 deletions pyam/_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
KNOWN_FUNCS,
to_list,
)
from pyam._compare import _compare


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -57,11 +59,12 @@ def _aggregate(df, variable, components=None, method=np.sum):
return _group_and_agg(_df, [], method)


def _aggregate_recursive(df, variable):
def _aggregate_recursive(df, variable, recursive):
"""Recursive aggregation along the variable tree"""

# downselect to components of `variable`, initialize list for aggregated (new) data
_df = df.filter(variable=f"{variable}|*")
# keep variable at highest level if it exists
_df = df.filter(variable=[variable, f"{variable}|*"])
data_list = []

# iterate over variables (bottom-up) and aggregate all components up to `variable`
Expand All @@ -70,9 +73,23 @@ def _aggregate_recursive(df, variable):
var_list = set([reduce_hierarchy(v, -1) for v in components])

# a temporary dataframe allows to distinguish between full data and new data
temp_df = _df.aggregate(variable=var_list)
_df.append(temp_df, inplace=True)
data_list.append(temp_df._data)
_data_agg = _aggregate(_df, variable=var_list)

# check if data for intermediate variables already exists
_data_self = _df.filter(variable=var_list)._data
_overlap = _data_agg.index.intersection(_data_self.index)
_new = _data_agg.index.difference(_data_self.index)

# assert that aggregated values are consistent with existing data (optional)
if recursive != "skip-validate" and not _overlap.empty:
conflict = _compare(_data_self, _data_agg[_overlap], "self", "aggregate")
if not conflict.empty:
msg = "Aggregated values are inconsistent with existing data:"
raise ValueError(f"{msg}\n{conflict}")

# append aggregated values that are not already in data
_df.append(_data_agg[_new], inplace=True)
data_list.append(_data_agg[_new])

return pd.concat(data_list)

Expand Down
22 changes: 22 additions & 0 deletions pyam/_compare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import numpy as np
import pandas as pd


def _compare(
left, right, left_label="left", right_label="right", drop_close=True, **kwargs
):
"""Internal implementation of comparison of IamDataFrames or pd.Series"""

def as_series(s):
return s if isinstance(s, pd.Series) else s._data

ret = pd.merge(
left=as_series(left).rename(index=left_label),
right=as_series(right).rename(index=right_label),
how="outer",
left_index=True,
right_index=True,
)
if drop_close:
ret = ret[~np.isclose(ret[left_label], ret[right_label], **kwargs)]
return ret
33 changes: 17 additions & 16 deletions pyam/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
)
from pyam.read_ixmp import read_ix
from pyam.plotting import PlotAccessor, mpl_args_to_meta_cols
from pyam._compare import _compare
from pyam._aggregate import (
_aggregate,
_aggregate_region,
Expand Down Expand Up @@ -686,7 +687,7 @@ def swap_time_for_year(self, inplace=False):

# assign data and other attributes
ret._LONG_IDX = _index
ret._data = _data.set_index(ret._LONG_IDX)
ret._data = _data.set_index(ret._LONG_IDX).value
ret.time_col = "year"
ret._set_attributes()
delattr(ret, "time")
Expand Down Expand Up @@ -1211,7 +1212,12 @@ def normalize(self, inplace=False, **kwargs):
return ret

def aggregate(
self, variable, components=None, method="sum", recursive=False, append=False
self,
variable,
components=None,
method="sum",
recursive=False,
append=False,
):
"""Aggregate timeseries by components or subcategories within each region
Expand All @@ -1223,8 +1229,11 @@ def aggregate(
Components to be aggregate, defaults to all subcategories of `variable`.
method : func or str, optional
Aggregation method, e.g. :func:`numpy.mean`, :func:`numpy.sum`, 'min', 'max'
recursive : bool, optional
recursive : bool or str, optional
Iterate recursively (bottom-up) over all subcategories of `variable`.
If there are existing intermediate variables, it validates the aggregated
value.
If recursive='skip-validate', it skips the validation.
append : bool, optional
Whether to append aggregated timeseries data to this instance.
Expand All @@ -1244,15 +1253,17 @@ def aggregate(
for individual components as 0.
"""

if recursive is True:
if recursive:
if components is not None:
raise ValueError("Recursive aggregation cannot take `components`!")
if method != "sum":
raise ValueError(
"Recursive aggregation only supported with `method='sum'`!"
)

_df = IamDataFrame(_aggregate_recursive(self, variable), meta=self.meta)
_df = IamDataFrame(
_aggregate_recursive(self, variable, recursive), meta=self.meta
)
else:
_df = _aggregate(self, variable, components=components, method=method)

Expand Down Expand Up @@ -2643,17 +2654,7 @@ def compare(
kwargs : arguments for comparison of values
passed to :func:`numpy.isclose`
"""
ret = pd.concat(
{
left_label: left.data.set_index(left._LONG_IDX),
right_label: right.data.set_index(right._LONG_IDX),
},
axis=1,
)
ret.columns = ret.columns.droplevel(1)
if drop_close:
ret = ret[~np.isclose(ret[left_label], ret[right_label], **kwargs)]
return ret
return _compare(left, right, left_label, right_label, drop_close=True, **kwargs)


def concat(dfs, ignore_meta_conflict=False, **kwargs):
Expand Down
30 changes: 30 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,18 @@
)


RECURSIVE_DF = pd.DataFrame(
[
["Secondary Energy|Electricity", "EJ/yr", 5, 19.0],
["Secondary Energy|Electricity|Wind", "EJ/yr", 5, 17],
["Secondary Energy|Electricity|Wind|Offshore", "EJ/yr", 1, 5],
["Secondary Energy|Electricity|Wind|Onshore", "EJ/yr", 4, 12],
["Secondary Energy|Electricity|Solar", "EJ/yr", np.nan, 2],
],
columns=["variable", "unit"] + TEST_YEARS,
)


TEST_STACKPLOT_DF = pd.DataFrame(
[
["World", "Emissions|CO2|Energy|Oil", "Mt CO2/yr", 2, 3.2, 2.0, 1.8],
Expand Down Expand Up @@ -210,6 +222,24 @@ def plot_df():
yield df


# IamDataFrame with two scenarios and structure for recursive aggregation
@pytest.fixture(scope="function", params=["year", "datetime"])
def recursive_df(request):

data = (
RECURSIVE_DF
if request.param == "year"
else RECURSIVE_DF.rename(DTS_MAPPING, axis="columns")
)

df = IamDataFrame(data, model="model_a", scenario="scen_a", region="World")
df2 = df.rename(scenario={"scen_a": "scen_b"})
df2._data *= 2
df.append(df2, inplace=True)

yield df


@pytest.fixture(scope="session")
def plot_stackplot_df():
df = IamDataFrame(TEST_STACKPLOT_DF)
Expand Down
51 changes: 26 additions & 25 deletions tests/test_feature_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,6 @@
columns=LONG_IDX + ["value"],
)

RECURSIVE_DF = pd.DataFrame(
[
["Secondary Energy|Electricity", "EJ/yr", 5, 19.0],
["Secondary Energy|Electricity|Wind", "EJ/yr", 5, 17],
["Secondary Energy|Electricity|Wind|Offshore", "EJ/yr", 1, 5],
["Secondary Energy|Electricity|Wind|Onshore", "EJ/yr", 4, 12],
["Secondary Energy|Electricity|Solar", "EJ/yr", np.nan, 2],
],
columns=["variable", "unit"] + TEST_YEARS,
)


@pytest.mark.parametrize(
"variable,data",
Expand Down Expand Up @@ -133,31 +122,43 @@ def test_aggregate_by_list_with_components_raises(simple_df):
pytest.raises(ValueError, simple_df.aggregate, v, components=components)


@pytest.mark.parametrize("time_col", (("year"), ("time")))
def test_aggregate_recursive(time_col):
def test_aggregate_recursive(recursive_df):
# use the feature `recursive=True`
data = (
RECURSIVE_DF
if time_col == "year"
else RECURSIVE_DF.rename(DTS_MAPPING, axis="columns")
)
df = IamDataFrame(data, model="model_a", scenario="scen_a", region="World")
df2 = df.rename(scenario={"scen_a": "scen_b"})
df2.data.value *= 2
df.append(df2, inplace=True)

# create object without variables to be aggregated
v = "Secondary Energy|Electricity"
agg_vars = [f"{v}{i}" for i in ["", "|Wind"]]
df_minimal = df.filter(variable=agg_vars, keep=False)
df_minimal = recursive_df.filter(variable=agg_vars, keep=False)

# return recursively aggregated data as new object
obs = df_minimal.aggregate(variable=v, recursive=True)
assert_iamframe_equal(obs, df.filter(variable=agg_vars))
assert_iamframe_equal(obs, recursive_df.filter(variable=agg_vars))

# append to `self`
df_minimal.aggregate(variable=v, recursive=True, append=True)
assert_iamframe_equal(df_minimal, df)
assert_iamframe_equal(df_minimal, recursive_df)


def test_aggregate_skip_intermediate(recursive_df):
# make the data inconsistent, check (and then skip) validation

recursive_df._data.iloc[0] = recursive_df._data.iloc[0] + 2
recursive_df._data.iloc[3] = recursive_df._data.iloc[3] + 2

# create object without variables to be aggregated, but with intermediate variables
v = "Secondary Energy|Electricity"
df_minimal = recursive_df.filter(variable=v, scenario="scen_a", keep=False)
agg_vars = [f"{v}{i}" for i in ["", "|Wind"]]
df_minimal.filter(variable=agg_vars, scenario="scen_b", keep=False, inplace=True)

# simply calling recursive aggregation raises an error
match = "Aggregated values are inconsistent with existing data:"
with pytest.raises(ValueError, match=match):
df_minimal.aggregate(variable=v, recursive=True, append=True)

# append to `self` with skipping validation
df_minimal.aggregate(variable=v, recursive="skip-validate", append=True)
assert_iamframe_equal(df_minimal, recursive_df)


@pytest.mark.parametrize(
Expand Down

0 comments on commit 412fcd8

Please sign in to comment.