Skip to content

Commit

Permalink
Region aggregation mip feature
Browse files Browse the repository at this point in the history
- adding min/max/avg aggregator function
- add: w.avg aggregator (aggregate_region/_aggregate)
- cleanup of docstrings
- add tests (for now in test_agrregation_region_aggrfuncs.py, to be moved into test_feature_aggregate)
- added 2 stage (sum) aggregation test. All := sum( sum(north), sum(other) ) and eqauls sum(all regions)
- removed single regions aggregation test with method='sum' (fails because sum(None) yields 0)
- use np.mean (instead of. np.average) for 'avg' method
- do components aggregation ONLY in case of method in ['sum', np.sum]
- add line to RELEASE_NOTES.md
  • Loading branch information
peterkolp authored and zikolach committed Aug 20, 2019
1 parent e50ddf6 commit b9b05ab
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 23 deletions.
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

## Individual Updates

- [#244](https://github.com/IAMconsortium/pyam/pull/244) add aggregation methods (min/max/avg) to aggregate_ragion and add eighted_average_region function
- [#228](https://github.com/IAMconsortium/pyam/pull/228) Update development environment creation instructions and make pandas requirement more specific
- [#219](https://github.com/IAMconsortium/pyam/pull/219) Add ability to query metadata from iiasa data sources
- [#214](https://github.com/IAMconsortium/pyam/pull/214) Tidy up requirements specifications a little
Expand Down
122 changes: 99 additions & 23 deletions pyam/core.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
YEAR_IDX,
IAMC_IDX,
SORT_IDX,
KNOWN_FUNCS
)
from pyam.read_ixmp import read_ix
from pyam.timeseries import fill_series
Expand Down Expand Up @@ -732,7 +733,7 @@ def normalize(self, inplace=False, **kwargs):
if not inplace:
return ret

def aggregate(self, variable, components=None, append=False):
def aggregate(self, variable, components=None, method='sum', append=False):
"""Compute the aggregate of timeseries components or sub-categories
Parameters
Expand All @@ -741,6 +742,9 @@ def aggregate(self, variable, components=None, append=False):
variable for which the aggregate should be computed
components: list of str, default None
list of variables, defaults to all sub-categories of `variable`
method: func or str
method to use for aggregation
e.g. np.mean, np.sum, ... 'min','max', ...
append: bool, default False
append the aggregate timeseries to `data` and return None,
else return aggregate timeseries
Expand All @@ -755,15 +759,15 @@ def aggregate(self, variable, components=None, append=False):
return

rows = self._apply_filters(variable=components)
_data = _aggregate(self.data[rows], 'variable')
_data = _aggregate(self.data[rows], 'variable', method)

if append is True:
self.append(_data, variable=variable, inplace=True)
else:
return _data

def check_aggregate(self, variable, components=None, exclude_on_fail=False,
multiplier=1, **kwargs):
def check_aggregate(self, variable, components=None, method='sum',
exclude_on_fail=False, multiplier=1, **kwargs):
"""Check whether a timeseries matches the aggregation of its components
Parameters
Expand All @@ -772,6 +776,8 @@ def check_aggregate(self, variable, components=None, exclude_on_fail=False,
variable to be checked for matching aggregation of sub-categories
components: list of str, default None
list of variables, defaults to all sub-categories of `variable`
method: func or str
method to use for aggregation
exclude_on_fail: boolean, default False
flag scenarios failing validation as `exclude: True`
multiplier: number, default 1
Expand All @@ -786,7 +792,8 @@ def check_aggregate(self, variable, components=None, exclude_on_fail=False,
# filter and groupby data, use `pd.Series.align` for matching index
rows = self._apply_filters(variable=variable)
df_variable, df_components = (
_aggregate(self.data[rows], 'variable').align(df_components)
_aggregate(self.data[rows], 'variable', method)
.align(df_components)
)

# use `np.isclose` for checking match
Expand All @@ -802,10 +809,60 @@ def check_aggregate(self, variable, components=None, exclude_on_fail=False,

return IamDataFrame(diff, variable=variable).timeseries()

def weighted_average_region(self, variable, weight, region='World',
subregions=None, append=False):
"""Compute weighted average of a timeseries over multiple regions
Parameters
----------
variable: str
variable for which the aggregate should be computed
weight: str
weighting variable to be used for calculating weighted average
region: str, default 'World'
dimension
subregions: list of str
list of subregions, defaults to all regions other than `region`
append: bool, default False
append the aggregate timeseries to `data` and return None,
else return aggregate timeseries
"""
# default subregions to all regions other than `region`
subregions = subregions or self._all_other_regions(region, variable)

subregion_df = self.filter(region=subregions)
cols = ['region', 'variable']

weightvar_df = subregion_df.filter(variable=weight).data

if weightvar_df.empty:
msg = "cannot aggregate because no data found "\
"or weight_var='{}'"
logger().error(msg.format(weight))
return
var_df = subregion_df.filter(variable=variable).data
# _data = _aggregate(var_df, cols,
# method='w.avg', weightvar_df=weightvar_df)
cols = [c for c in list(var_df.columns) if c not in ['value'] + cols]
calcdf = pd.merge(var_df, weightvar_df,
how='left', suffixes=('', '_weight'),
left_on=['model', 'scenario', 'region', 'year'],
right_on=['model', 'scenario', 'region', 'year'])
calcdf['value'] = calcdf['value'] * calcdf['value_weight']
calcdf = calcdf.groupby(cols)['value', 'value_weight'].agg(np.sum)
calcdf['value'] = calcdf['value'] / calcdf['value_weight']
_data = calcdf.drop('value_weight', axis=1)
# append?
if append is True:
self.append(_data, region=region, variable=variable, inplace=True)
else:
return _data

def aggregate_region(self, variable, region='World', subregions=None,
components=None, append=False):
components=None, append=False, method='sum'):
"""Compute the aggregate of timeseries over a number of regions
including variable components only defined at the `region` level
This function adds `components` only defined at the `region` level
Parameters
----------
Expand All @@ -823,9 +880,7 @@ def aggregate_region(self, variable, region='World', subregions=None,
else return aggregate timeseries
"""
# default subregions to all regions other than `region`
if subregions is None:
rows = self._apply_filters(variable=variable)
subregions = set(self.data[rows].region) - set([region])
subregions = subregions or self._all_other_regions(region, variable)

if not len(subregions):
msg = 'cannot aggregate variable `{}` to `{}` because it does not'\
Expand All @@ -837,26 +892,34 @@ def aggregate_region(self, variable, region='World', subregions=None,
# compute aggregate over all subregions
subregion_df = self.filter(region=subregions)
cols = ['region', 'variable']
_data = _aggregate(subregion_df.filter(variable=variable).data, cols)
_data = _aggregate(subregion_df.filter(variable=variable).data,
cols, method=method)

# add components at the `region` level, defaults to all variables one
# level below `variable` that are only present in `region`
with adjust_log_level():
region_df = self.filter(region=region)
components = components or (
set(region_df._variable_components(variable)).difference(
subregion_df._variable_components(variable)))

if len(components):
rows = region_df._apply_filters(variable=components)
_data = _data.add(_aggregate(region_df.data[rows], cols),
fill_value=0)
# ONLY in case method is sum/np.sum
if method in ['sum', np.sum]:
with adjust_log_level():
region_df = self.filter(region=region)
components = components or (
set(region_df._variable_components(variable)).difference(
subregion_df._variable_components(variable)))

if len(components):
rows = region_df._apply_filters(variable=components)
_data = _data.add(_aggregate(region_df.data[rows], cols),
fill_value=0)

if append is True:
self.append(_data, region=region, variable=variable, inplace=True)
else:
return _data

def _all_other_regions(self, region, variable):
"""Determine subregions as all regions other than `region`"""
rows = self._apply_filters(variable=variable)
return set(self.data[rows].region) - set([region])

def check_aggregate_region(self, variable, region='World', subregions=None,
components=None, exclude_on_fail=False,
**kwargs):
Expand Down Expand Up @@ -1386,11 +1449,24 @@ def _meta_idx(data):
return data[META_IDX].drop_duplicates().set_index(META_IDX).index


def _aggregate(df, by):
def _aggregate(df, by, method=np.sum):
"""Aggregate `df` by specified column(s), return indexed `pd.Series`"""
by = [by] if isstr(by) else by
cols = [c for c in list(df.columns) if c not in ['value'] + by]
return df.groupby(cols).sum()['value']
# pick aggregator func (default: sum)
return df.groupby(cols)['value'].agg(_get_method_func(method))


def _get_method_func(method):
"""Translate a string to a known method"""
if not isstr(method):
return method

if method in KNOWN_FUNCS:
return KNOWN_FUNCS[method]

# raise error if `method` is a string but not in dict of known methods
raise ValueError('method `{}` is not a known aggregator'.format(method))


def _raise_filter_error(col):
Expand Down
2 changes: 2 additions & 0 deletions pyam/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
+ ['{}{}'.format(i, j) for i, j in itertools.product(
string.ascii_uppercase, string.ascii_uppercase)]))

KNOWN_FUNCS = {'min': np.min, 'max': np.max, 'avg': np.mean, 'sum': np.sum}


def requires_package(pkg, msg, error_type=ImportError):
"""Decorator when a function requires an optional dependency
Expand Down
Loading

0 comments on commit b9b05ab

Please sign in to comment.