Skip to content

Commit

Permalink
Partial region aggregation (#99)
Browse files Browse the repository at this point in the history
* Add pass through for model native results

* Modify tests for partial aggregation

* Update tests/data/region_processing/dsd/variable/variables.yaml

Co-authored-by: Daniel Huppmann <dh@dergelbesalon.at>

* Reset tests to create dedicated partial aggregation test

* Add deep copy of variable list for each common region

* Create dedicated test for partial aggregation

* Add basic logging setup

* Add _combine_aggregate_and_model_native

* Modify test for checking for warning message

* Appease stickler

* Fix merge conflicts

* Remove unneccesary comments

* Apply suggestions from @danielhuppmann

* Add log check to weighted aggregation test

* Alternative implementation for partial-aggregation

* Appease stickler

* Restructure comparsion into function

* Add tests for partial region aggregation

* Make pyam logging adjustment explicit

* Fix bug connected to region-aggregation attribute

* Add test for region-aggregation bug

* Add partial region aggregation to docs

Co-authored-by: Daniel Huppmann <dh@dergelbesalon.at>
  • Loading branch information
phackstock and danielhuppmann authored Mar 8, 2022
1 parent 2f71689 commit c03f9d3
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 21 deletions.
2 changes: 1 addition & 1 deletion doc/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ API documentation
.. _toplevel-functions:

Top-level functions
----------------
-------------------

.. autofunction:: process

Expand Down
43 changes: 43 additions & 0 deletions doc/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,46 @@ This example illustrates how such a model mapping looks like:
The names of the constituent regions **must** refer to the **original** model
native region names. In the above example *region_a* and *region_b* and **not**
*alternative_name_a*.

Partial region aggregation
~~~~~~~~~~~~~~~~~~~~~~~~~~

During the region aggregation process provided and aggregated data are combined in a
process dubbed 'partial region aggregation'.

As an example, consider the following model mapping:

.. code:: yaml
model: model_a
common_regions:
- common_region_1:
- region_a
- region_b
If the data provided for region aggregation contains results for *common_region_1* they
are compared and combined according to the following logic:

1. If a variable is **not** reported for *common_region_1*, it is calculated through
region aggregation of regions *region_a* and *region_b*.
2. If a variable is **only** reported for *common_region_1* level it is used directly.
3. If a variable is is reported for *common_region_1* **as well as** *region_a* and
*region_b*. The **provided results** take **precedence** over the aggregated ones.
Additionally, the aggregation is computed and compared to the provided results. If
there are discrepancies, a warning is written to the logs.

.. note::

Please note that in case of differences no error is raised. Therefore it is
necessary to check the logs to find out if there were any differences. This is
intentional since some differences might be expected.

More points to note:

* The attribute `skip-region-aggregation` which can be set to `true` for variables as
part of a DataStructureDefinition has the following effect. If `true`, aggregation for
this variable is skipped. However, if the variable is part of the provided data, it
**is** used.
* The `region-aggregation` attribute works with partial region aggregation. If a
variable is found in the provided data, it is used over aggregated results. Any
discrepancies between the provided and aggregated data are written to the log.
7 changes: 6 additions & 1 deletion nomenclature/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@
RegionAggregationMapping,
)


# set up logging
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO,
)

logger = logging.getLogger(__name__)

# get version number either from git (preferred) or metadata
Expand Down
80 changes: 62 additions & 18 deletions nomenclature/processor/region.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,23 @@
from pathlib import Path
from typing import Dict, List, Optional, Set, Union

import jsonschema
import pyam
import pydantic
import yaml
import jsonschema
from pyam import IamDataFrame
from pyam.logging import adjust_log_level
from pydantic import BaseModel, root_validator, validate_arguments, validator
from pydantic.types import DirectoryPath, FilePath
from pydantic.error_wrappers import ErrorWrapper

from nomenclature.definition import DataStructureDefinition
from nomenclature.codelist import PYAM_AGG_KWARGS
from nomenclature.definition import DataStructureDefinition
from nomenclature.error.region import (
ModelMappingCollisionError,
RegionNameCollisionError,
RegionNotDefinedError,
)
from nomenclature.processor.utils import get_relative_path

from pyam import IamDataFrame
from pyam.logging import adjust_log_level
from pydantic import BaseModel, root_validator, validate_arguments, validator
from pydantic.error_wrappers import ErrorWrapper
from pydantic.types import DirectoryPath, FilePath

AGG_KWARGS = PYAM_AGG_KWARGS + ["region-aggregation"]

Expand Down Expand Up @@ -233,14 +231,18 @@ def from_file(cls, file: Union[Path, str]):
def all_regions(self) -> List[str]:
# For the native regions we take the **renamed** (if given) names
nr_list = [x.target_native_region for x in self.native_regions or []]
cr_list = [x.name for x in self.common_regions or []]
return nr_list + cr_list
return nr_list + self.common_region_names

@property
def model_native_region_names(self) -> List[str]:
# List of the **original** model native region names
return [x.name for x in self.native_regions]

@property
def common_region_names(self) -> List[str]:
# List of the **original** model native region names
return [x.name for x in self.common_regions or []]

@property
def rename_mapping(self) -> Dict[str, str]:
return {r.name: r.target_native_region for r in self.native_regions or []}
Expand Down Expand Up @@ -323,6 +325,7 @@ def apply(self, df: IamDataFrame, dsd: DataStructureDefinition) -> IamDataFrame:
if model not in self.mappings:
logger.info(f"No region aggregation mapping found for model {model}")
processed_dfs.append(model_df)

# Otherwise we first rename, then aggregate
else:
# before aggregating, check that all regions are valid
Expand All @@ -332,14 +335,17 @@ def apply(self, df: IamDataFrame, dsd: DataStructureDefinition) -> IamDataFrame:
f"{self.mappings[model].file}"
)

with adjust_log_level(level="ERROR"): # silence empty filter
_processed_dfs = []

# Silence pyam's empty filter warnings
with adjust_log_level(logger="pyam", level="ERROR"):
# Rename
if self.mappings[model].native_regions is not None:
_df = model_df.filter(
region=self.mappings[model].model_native_region_names
)
if not _df.empty:
processed_dfs.append(
_processed_dfs.append(
_df.rename(region=self.mappings[model].rename_mapping)
)

Expand All @@ -358,7 +364,7 @@ def apply(self, df: IamDataFrame, dsd: DataStructureDefinition) -> IamDataFrame:
for cr in self.mappings[model].common_regions:
regions = [cr.name, cr.constituent_regions]
# First, perform 'simple' aggregation (no arguments)
processed_dfs.append(
_processed_dfs.append(
model_df.aggregate_region(vars_default_args, *regions)
)
# Second, special weighted aggregation
Expand All @@ -371,7 +377,7 @@ def apply(self, df: IamDataFrame, dsd: DataStructureDefinition) -> IamDataFrame:
**kwargs,
)
if _df is not None and not _df.empty:
processed_dfs.append(_df)
_processed_dfs.append(_df)
else:
for rename_var in kwargs["region-aggregation"]:
for _rename, _kwargs in rename_var.items():
Expand All @@ -382,10 +388,24 @@ def apply(self, df: IamDataFrame, dsd: DataStructureDefinition) -> IamDataFrame:
**_kwargs,
)
if _df is not None and not _df.empty:
processed_dfs.append(
_processed_dfs.append(
_df.rename(variable={var: _rename})
)

common_region_df = model_df.filter(
region=self.mappings[model].common_region_names,
variable=dsd.variable,
)

# concatenate and merge with data provided at common-region level
if _processed_dfs:
processed_dfs.append(
_merge_with_provided_data(_processed_dfs, common_region_df)
)
# if data exists only at the common-region level
elif not common_region_df.empty:
processed_dfs.append(common_region_df)

if not processed_dfs:
raise ValueError(
f"The region aggregation for model {model} resulted in an empty dataset"
Expand All @@ -409,8 +429,32 @@ def _aggregate_region(df, var, *regions, **kwargs):
return df.aggregate_region(var, *regions, **kwargs)
except ValueError as e:
if str(e) == "Inconsistent index between variable and weight!":
logger.warning(
f"Could not aggregate '{var}' for region {regions[0]} ({kwargs})"
logger.info(
f"Could not aggregate '{var}' for region '{regions[0]}' ({kwargs})"
)
else:
raise e


def _merge_with_provided_data(
_processed_df: IamDataFrame, common_region_df: IamDataFrame
) -> IamDataFrame:
"""Compare and merge provided and aggregated results"""

# validate that aggregated data matches to original data
aggregate_df = pyam.concat(_processed_df)
compare = pyam.compare(
common_region_df,
aggregate_df,
left_label="common-region",
right_label="aggregation",
)
# drop all data which is not in both data frames
diff = compare.dropna()

if diff is not None and len(diff):
logging.warning("Difference between original and aggregated data:\n" f"{diff}")

# merge aggregated data onto original common-region data
index = aggregate_df._data.index.difference(common_region_df._data.index)
return common_region_df.append(aggregate_df._data[index])
2 changes: 2 additions & 0 deletions tests/data/region_processing/dsd/region/regions.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
- common:
- World
- common_region_A
- common_region_B
- model_native:
- region_A
- region_B
Expand Down
20 changes: 20 additions & 0 deletions tests/data/region_processing/dsd/variable/variables.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,23 @@
- Share|Coal:
definition: Share of Coal in the total primary energy mix
unit:
- Temperature|Mean:
definition: Global mean temperature
unit: C
- Skip-Aggregation:
definition: Test variable to be skipped during aggregation
unit: EJ/yr
skip-region-aggregation: true
- Variable A:
definition: Test variable to be used for computing a max aggregate
unit: EJ/yr
region-aggregation:
- Variable A (max):
method: max
- Variable A (max):
unit: EJ/yr
- Variable B:
unit: EJ/yr
region-aggregation:
- Variable B:
method: max
5 changes: 5 additions & 0 deletions tests/data/region_processing/partial_aggregation/model_a.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
model: m_a
common_regions:
- World:
- region_A
- region_B
Loading

0 comments on commit c03f9d3

Please sign in to comment.