Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include meta in new IamDataFrames returned by aggregation functions #416

Merged
merged 18 commits into from Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
@@ -1,3 +1,7 @@
# Next release

- [#416](https://github.com/IAMconsortium/pyam/pull/416) Include `meta` in new IamDataFrames returned by aggregation functions

# Release v0.7.0

## Highlights
Expand Down
93 changes: 42 additions & 51 deletions pyam/core.py
Expand Up @@ -33,9 +33,9 @@
read_pandas,
format_data,
sort_data,
merge_meta,
to_int,
find_depth,
reduce_hierarchy,
pattern_match,
years_match,
month_match,
Expand Down Expand Up @@ -76,14 +76,18 @@ class IamDataFrame(object):
Support is provided additionally for R-style data columns for years,
like "X2015", etc.
kwargs
if `value=col`, melt column `col` to 'value' and use `col` name as
'variable'; or mapping of required columns (:code:`IAMC_IDX`) to
If `value=<col>`, melt column `<col>` to 'value' and use `<col>` name
as 'variable'; or mapping of required columns (:code:`IAMC_IDX`) to
any of the following:

- one column in `data`
- multiple columns, to be concatenated by :code:`|`
- a string to be used as value for this column

A :class:`pandas.DataFrame` with suitable `meta` indicators can be
passed as `meta=<df>`. The index will be downselected to those
scenarios that have timeseries data.

Notes
-----
When initializing an :class:`IamDataFrame` from an xlsx file,
Expand Down Expand Up @@ -115,10 +119,14 @@ def _init(self, data, **kwargs):
"""Process data and set attributes for new instance"""
# import data from pd.DataFrame or read from source
if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
meta = kwargs.pop('meta') if 'meta' in kwargs else None
_data = format_data(data.copy(), **kwargs)
elif has_ix and isinstance(data, ixmp.TimeSeries):
# TODO read meta indicators from ixmp
meta = None
_data = read_ix(data, **kwargs)
else:
meta = None
logger.info('Reading file `{}`'.format(data))
_data = read_file(data, **kwargs)

Expand All @@ -135,6 +143,11 @@ def _init(self, data, **kwargs):
self.meta = self.data[META_IDX].drop_duplicates().set_index(META_IDX)
self.reset_exclude()

# merge meta dataframe (if given in kwargs)
if meta is not None:
self.meta = merge_meta(meta.loc[_make_index(self.data)],
self.meta, ignore_meta_conflict=True)

# if initializing from xlsx, try to load `meta` table from file
meta_sheet = kwargs.get('meta_sheet_name', 'meta')
if isstr(data) and data.endswith('.xlsx') and meta_sheet is not False\
Expand Down Expand Up @@ -254,8 +267,9 @@ def append(self, other, ignore_meta_conflict=False, inplace=False,
**kwargs):
"""Append any IamDataFrame-like object to this object

Columns in `other.meta` that are not in `self.meta` are always merged,
duplicate region-variable-unit-year rows raise a ValueError.
Indicators in `other.meta` that are not in `self.meta` are merged.
Missing values are set to `NaN`.
Conflicting `data` rows always raise a `ValueError`.

Parameters
----------
Expand All @@ -266,8 +280,9 @@ def append(self, other, ignore_meta_conflict=False, inplace=False,
any meta columns present in `self` and `other` are not identical.
inplace : bool, default False
if True, do operation inplace and return None
kwargs : initializing other as IamDataFrame
kwargs
passed to :class:`IamDataFrame(other, **kwargs) <IamDataFrame>`
if `other` is not already an IamDataFrame
"""
if not isinstance(other, IamDataFrame):
other = IamDataFrame(other, **kwargs)
Expand All @@ -278,41 +293,15 @@ def append(self, other, ignore_meta_conflict=False, inplace=False,

ret = self.copy() if not inplace else self

diff = other.meta.index.difference(ret.meta.index)
intersect = other.meta.index.intersection(ret.meta.index)

# merge other.meta columns not in self.meta for existing scenarios
if not intersect.empty:
# if not ignored, check that overlapping meta dataframes are equal
if not ignore_meta_conflict:
cols = [i for i in other.meta.columns if i in ret.meta.columns]
if not ret.meta.loc[intersect, cols].equals(
other.meta.loc[intersect, cols]):
conflict_idx = (
pd.concat([ret.meta.loc[intersect, cols],
other.meta.loc[intersect, cols]]
).drop_duplicates()
.index.drop_duplicates()
)
msg = 'conflict in `meta` for scenarios {}'.format(
[i for i in pd.DataFrame(index=conflict_idx).index])
raise ValueError(msg)

cols = [i for i in other.meta.columns if i not in ret.meta.columns]
_meta = other.meta.loc[intersect, cols]
ret.meta = ret.meta.merge(_meta, how='outer',
left_index=True, right_index=True)

# join other.meta for new scenarios
if not diff.empty:
ret.meta = ret.meta.append(other.meta.loc[diff, :], sort=False)
# merge `meta` tables
ret.meta = merge_meta(ret.meta, other.meta, ignore_meta_conflict)

# append other.data (verify integrity for no duplicates)
_data = ret.data.set_index(sorted(ret._LONG_IDX)).append(
other.data.set_index(sorted(other._LONG_IDX)),
verify_integrity=True)

# merge extra columns in `data` and set `LONG_IDX`
# merge extra columns in `data` and set `self._LONG_IDX`
ret.extra_cols += [i for i in other.extra_cols
if i not in ret.extra_cols]
ret._LONG_IDX = IAMC_IDX + [ret.time_col] + ret.extra_cols
Expand Down Expand Up @@ -928,7 +917,7 @@ def aggregate(self, variable, components=None, method='sum',
if append is True:
self.append(_df, inplace=True)
else:
return IamDataFrame(_df)
return IamDataFrame(_df, meta=self.meta)

def check_aggregate(self, variable, components=None, method='sum',
exclude_on_fail=False, multiplier=1, **kwargs):
Expand Down Expand Up @@ -1019,7 +1008,7 @@ def aggregate_region(self, variable, region='World', subregions=None,
if append is True:
self.append(_df, region=region, inplace=True)
else:
return IamDataFrame(_df, region=region)
return IamDataFrame(_df, region=region, meta=self.meta)

def check_aggregate_region(self, variable, region='World', subregions=None,
components=False, method='sum', weight=None,
Expand Down Expand Up @@ -1095,17 +1084,17 @@ def aggregate_time(self, variable, column='subannual', value='year',
----------
variable : str or list of str
variable(s) to be aggregated
column : str, default 'subannual'
column : str, optional
the data column to be used as subannual time representation
value : str, default 'year
value : str, optional
the name of the aggregated (subannual) time
components : list of str
subannual timeslices to be aggregated; defaults to all subannual
timeslices other than ``value``
method : func or str, default 'sum'
timeslices other than `value`
method : func or str, optional
method to use for aggregation,
e.g. :func:`numpy.mean`, :func:`numpy.sum`, 'min', 'max'
append : bool, default False
append : bool, optional
append the aggregate timeseries to `self` and return None,
else return aggregate timeseries as new :class:`IamDataFrame`
"""
Expand All @@ -1120,9 +1109,7 @@ def aggregate_time(self, variable, column='subannual', value='year',
if append is True:
self.append(_df, inplace=True)
else:
df = IamDataFrame(_df)
df.meta = self.meta.loc[_make_index(df.data)]
return df
return IamDataFrame(_df, meta=self.meta)

def downscale_region(self, variable, region='World', subregions=None,
proxy=None, weight=None, append=False):
Expand Down Expand Up @@ -1180,9 +1167,7 @@ def downscale_region(self, variable, region='World', subregions=None,
if append is True:
self.append(_data, inplace=True)
else:
df = IamDataFrame(_data)
df.meta = self.meta.loc[_make_index(df.data)]
return df
return IamDataFrame(_data, meta=self.meta)

def _all_other_regions(self, region, variable=None):
"""Return list of regions other than `region` containing `variable`"""
Expand Down Expand Up @@ -1796,9 +1781,15 @@ def _apply_criteria(df, criteria, **kwargs):


def _make_index(df, cols=META_IDX):
"""Create an index from the columns of a dataframe"""
return pd.MultiIndex.from_tuples(
pd.unique(list(zip(*[df[col] for col in cols]))), names=tuple(cols))
"""Create an index from the columns of a dataframe or series"""
def _get_col(c):
try:
return df.index.get_level_values(c)
except KeyError:
return df[c]

index = pd.unique(list(zip(*[_get_col(col) for col in cols])))
return pd.MultiIndex.from_tuples(index, names=tuple(cols))


def validate(df, criteria={}, exclude_on_fail=False, **kwargs):
Expand Down
33 changes: 33 additions & 0 deletions pyam/utils.py
Expand Up @@ -280,6 +280,39 @@ def sort_data(data, cols):
return data.sort_values(cols)[cols + ['value']].reset_index(drop=True)


def merge_meta(left, right, ignore_meta_conflict=False):
"""Merge two `meta` tables; raise if values are in conflict (optional)

If conflicts are ignored, values in `left` take precedence over `right`.
"""
left = left.copy() # make a copy to not change the original object
diff = right.index.difference(left.index)
sect = right.index.intersection(left.index)

# merge `right` into `left` for overlapping scenarios ( `sect`)
if not sect.empty:
# if not ignored, check that overlapping `meta` columns are equal
if not ignore_meta_conflict:
cols = [i for i in right.columns if i in left.columns]
if not left.loc[sect, cols].equals(right.loc[sect, cols]):
conflict_idx = (
pd.concat([right.loc[sect, cols], left.loc[sect, cols]])
.drop_duplicates().index.drop_duplicates()
)
msg = 'conflict in `meta` for scenarios {}'.format(
[i for i in pd.DataFrame(index=conflict_idx).index])
raise ValueError(msg)
# merge new columns
cols = [i for i in right.columns if i not in left.columns]
left = left.merge(right.loc[sect, cols], how='outer',
left_index=True, right_index=True)

# join `other.meta` for new scenarios (`diff`)
if not diff.empty:
left = left.append(right.loc[diff, :], sort=False)

return left

def find_depth(data, s='', level=None):
"""Return or assert the depth (number of ``|``) of variables

Expand Down
26 changes: 20 additions & 6 deletions tests/conftest.py
Expand Up @@ -3,19 +3,20 @@
matplotlib.use('agg')

import os
from requests.exceptions import SSLError
from requests.exceptions import ConnectionError
import pytest
import numpy as np
import pandas as pd

from datetime import datetime
from pyam import IamDataFrame, IAMC_IDX, iiasa
from pyam import IamDataFrame, META_IDX, IAMC_IDX, iiasa


# verify whether IIASA database API can be reached, skip tests otherwise
try:
iiasa.Connection()
IIASA_UNAVAILABLE = False
except SSLError:
except ConnectionError:
IIASA_UNAVAILABLE = True

TEST_API = 'integration-test'
Expand Down Expand Up @@ -43,6 +44,12 @@
columns=IAMC_IDX + TEST_YEARS,
)

META_COLS = ['number', 'string']
META_DF = pd.DataFrame([
['model_a', 'scen_a', 1, 'foo'],
['model_a', 'scen_b', 2, np.nan],
], columns=META_IDX + META_COLS).set_index(META_IDX)


FULL_FEATURE_DF = pd.DataFrame([
['World', 'Primary Energy', 'EJ/yr', 12, 15],
Expand Down Expand Up @@ -120,13 +127,17 @@ def test_df(request):
tdf = TEST_DF.rename({2005: request.param[0], 2010: request.param[1]},
axis="columns")
df = IamDataFrame(data=tdf)
for i in META_COLS:
df.set_meta(META_DF[i])
yield df


# minimal IamDataFrame for specifically testing 'year'-column features
@pytest.fixture(scope="function")
def test_df_year():
df = IamDataFrame(data=TEST_DF)
for i in META_COLS:
df.set_meta(META_DF[i])
yield df


Expand All @@ -148,7 +159,9 @@ def simple_df(request):
_df = FULL_FEATURE_DF.copy()
if request.param == 'datetime':
_df.rename(DTS_MAPPING, axis="columns", inplace=True)
yield IamDataFrame(model='model_a', scenario='scen_a', data=_df)
df = IamDataFrame(model='model_a', scenario='scen_a', data=_df)
df.set_meta('foo', 'string')
yield df


# IamDataFrame with subannual time resolution
Expand All @@ -165,8 +178,9 @@ def add_subannual(_data, name, value):
mapping = [('year', 1), ('winter', 0.7), ('summer', 0.3)]
lst = [add_subannual(_df.copy(), name, value) for name, value in mapping]

yield IamDataFrame(model='model_a', scenario='scen_a', data=pd.concat(lst))

df = IamDataFrame(model='model_a', scenario='scen_a', data=pd.concat(lst))
df.set_meta('foo', 'string')
yield df

@pytest.fixture(scope="function")
def reg_df():
Expand Down
9 changes: 5 additions & 4 deletions tests/test_feature_aggregate.py
Expand Up @@ -65,7 +65,7 @@ def test_aggregate(simple_df, variable, data):
if simple_df.time_col == 'time':
_df.year = _df.year.replace(DTS_MAPPING)
_df.rename({'year': 'time'}, axis='columns', inplace=True)
exp = IamDataFrame(_df)
exp = IamDataFrame(_df, meta=simple_df.meta)
for m in ['max', np.max]:
assert_iamframe_equal(simple_df.aggregate(variable, method=m), exp)

Expand Down Expand Up @@ -262,10 +262,11 @@ def test_aggregate_region_with_other_method(simple_df, variable, data):
if simple_df.time_col == 'time':
_df.year = _df.year.replace(DTS_MAPPING)
_df.rename({'year': 'time'}, axis='columns', inplace=True)
exp = IamDataFrame(_df).filter(region='World')

exp = IamDataFrame(_df, meta=simple_df.meta).filter(region='World')
for m in ['max', np.max]:
assert_iamframe_equal(simple_df.aggregate_region(variable, method=m),
exp)
obs = simple_df.aggregate_region(variable, method=m)
assert_iamframe_equal(obs, exp)


def test_aggregate_region_with_components(simple_df):
Expand Down