diff --git a/doc/source/api.rst b/doc/source/api.rst index 8b02b20c..35d9198f 100644 --- a/doc/source/api.rst +++ b/doc/source/api.rst @@ -8,7 +8,7 @@ API documentation .. _toplevel-functions: Top-level functions ----------------- +------------------- .. autofunction:: process diff --git a/doc/source/usage.rst b/doc/source/usage.rst index f8fd188f..3c8d04a9 100644 --- a/doc/source/usage.rst +++ b/doc/source/usage.rst @@ -240,3 +240,46 @@ This example illustrates how such a model mapping looks like: The names of the constituent regions **must** refer to the **original** model native region names. In the above example *region_a* and *region_b* and **not** *alternative_name_a*. + +Partial region aggregation +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +During the region aggregation process provided and aggregated data are combined in a +process dubbed 'partial region aggregation'. + +As an example, consider the following model mapping: + +.. code:: yaml + + model: model_a + common_regions: + - common_region_1: + - region_a + - region_b + +If the data provided for region aggregation contains results for *common_region_1* they +are compared and combined according to the following logic: + +1. If a variable is **not** reported for *common_region_1*, it is calculated through + region aggregation of regions *region_a* and *region_b*. +2. If a variable is **only** reported for *common_region_1* level it is used directly. +3. If a variable is is reported for *common_region_1* **as well as** *region_a* and + *region_b*. The **provided results** take **precedence** over the aggregated ones. + Additionally, the aggregation is computed and compared to the provided results. If + there are discrepancies, a warning is written to the logs. + + .. note:: + + Please note that in case of differences no error is raised. Therefore it is + necessary to check the logs to find out if there were any differences. This is + intentional since some differences might be expected. + +More points to note: + +* The attribute `skip-region-aggregation` which can be set to `true` for variables as + part of a DataStructureDefinition has the following effect. If `true`, aggregation for + this variable is skipped. However, if the variable is part of the provided data, it + **is** used. +* The `region-aggregation` attribute works with partial region aggregation. If a + variable is found in the provided data, it is used over aggregated results. Any + discrepancies between the provided and aggregated data are written to the log. diff --git a/nomenclature/__init__.py b/nomenclature/__init__.py index d871b020..e3a1b3a3 100644 --- a/nomenclature/__init__.py +++ b/nomenclature/__init__.py @@ -12,8 +12,13 @@ RegionAggregationMapping, ) - # set up logging +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, +) + logger = logging.getLogger(__name__) # get version number either from git (preferred) or metadata diff --git a/nomenclature/processor/region.py b/nomenclature/processor/region.py index c4ba7f80..a30ad72d 100644 --- a/nomenclature/processor/region.py +++ b/nomenclature/processor/region.py @@ -3,25 +3,23 @@ from pathlib import Path from typing import Dict, List, Optional, Set, Union +import jsonschema import pyam import pydantic import yaml -import jsonschema -from pyam import IamDataFrame -from pyam.logging import adjust_log_level -from pydantic import BaseModel, root_validator, validate_arguments, validator -from pydantic.types import DirectoryPath, FilePath -from pydantic.error_wrappers import ErrorWrapper - -from nomenclature.definition import DataStructureDefinition from nomenclature.codelist import PYAM_AGG_KWARGS +from nomenclature.definition import DataStructureDefinition from nomenclature.error.region import ( ModelMappingCollisionError, RegionNameCollisionError, RegionNotDefinedError, ) from nomenclature.processor.utils import get_relative_path - +from pyam import IamDataFrame +from pyam.logging import adjust_log_level +from pydantic import BaseModel, root_validator, validate_arguments, validator +from pydantic.error_wrappers import ErrorWrapper +from pydantic.types import DirectoryPath, FilePath AGG_KWARGS = PYAM_AGG_KWARGS + ["region-aggregation"] @@ -233,14 +231,18 @@ def from_file(cls, file: Union[Path, str]): def all_regions(self) -> List[str]: # For the native regions we take the **renamed** (if given) names nr_list = [x.target_native_region for x in self.native_regions or []] - cr_list = [x.name for x in self.common_regions or []] - return nr_list + cr_list + return nr_list + self.common_region_names @property def model_native_region_names(self) -> List[str]: # List of the **original** model native region names return [x.name for x in self.native_regions] + @property + def common_region_names(self) -> List[str]: + # List of the **original** model native region names + return [x.name for x in self.common_regions or []] + @property def rename_mapping(self) -> Dict[str, str]: return {r.name: r.target_native_region for r in self.native_regions or []} @@ -323,6 +325,7 @@ def apply(self, df: IamDataFrame, dsd: DataStructureDefinition) -> IamDataFrame: if model not in self.mappings: logger.info(f"No region aggregation mapping found for model {model}") processed_dfs.append(model_df) + # Otherwise we first rename, then aggregate else: # before aggregating, check that all regions are valid @@ -332,14 +335,17 @@ def apply(self, df: IamDataFrame, dsd: DataStructureDefinition) -> IamDataFrame: f"{self.mappings[model].file}" ) - with adjust_log_level(level="ERROR"): # silence empty filter + _processed_dfs = [] + + # Silence pyam's empty filter warnings + with adjust_log_level(logger="pyam", level="ERROR"): # Rename if self.mappings[model].native_regions is not None: _df = model_df.filter( region=self.mappings[model].model_native_region_names ) if not _df.empty: - processed_dfs.append( + _processed_dfs.append( _df.rename(region=self.mappings[model].rename_mapping) ) @@ -358,7 +364,7 @@ def apply(self, df: IamDataFrame, dsd: DataStructureDefinition) -> IamDataFrame: for cr in self.mappings[model].common_regions: regions = [cr.name, cr.constituent_regions] # First, perform 'simple' aggregation (no arguments) - processed_dfs.append( + _processed_dfs.append( model_df.aggregate_region(vars_default_args, *regions) ) # Second, special weighted aggregation @@ -371,7 +377,7 @@ def apply(self, df: IamDataFrame, dsd: DataStructureDefinition) -> IamDataFrame: **kwargs, ) if _df is not None and not _df.empty: - processed_dfs.append(_df) + _processed_dfs.append(_df) else: for rename_var in kwargs["region-aggregation"]: for _rename, _kwargs in rename_var.items(): @@ -382,10 +388,24 @@ def apply(self, df: IamDataFrame, dsd: DataStructureDefinition) -> IamDataFrame: **_kwargs, ) if _df is not None and not _df.empty: - processed_dfs.append( + _processed_dfs.append( _df.rename(variable={var: _rename}) ) + common_region_df = model_df.filter( + region=self.mappings[model].common_region_names, + variable=dsd.variable, + ) + + # concatenate and merge with data provided at common-region level + if _processed_dfs: + processed_dfs.append( + _merge_with_provided_data(_processed_dfs, common_region_df) + ) + # if data exists only at the common-region level + elif not common_region_df.empty: + processed_dfs.append(common_region_df) + if not processed_dfs: raise ValueError( f"The region aggregation for model {model} resulted in an empty dataset" @@ -409,8 +429,32 @@ def _aggregate_region(df, var, *regions, **kwargs): return df.aggregate_region(var, *regions, **kwargs) except ValueError as e: if str(e) == "Inconsistent index between variable and weight!": - logger.warning( - f"Could not aggregate '{var}' for region {regions[0]} ({kwargs})" + logger.info( + f"Could not aggregate '{var}' for region '{regions[0]}' ({kwargs})" ) else: raise e + + +def _merge_with_provided_data( + _processed_df: IamDataFrame, common_region_df: IamDataFrame +) -> IamDataFrame: + """Compare and merge provided and aggregated results""" + + # validate that aggregated data matches to original data + aggregate_df = pyam.concat(_processed_df) + compare = pyam.compare( + common_region_df, + aggregate_df, + left_label="common-region", + right_label="aggregation", + ) + # drop all data which is not in both data frames + diff = compare.dropna() + + if diff is not None and len(diff): + logging.warning("Difference between original and aggregated data:\n" f"{diff}") + + # merge aggregated data onto original common-region data + index = aggregate_df._data.index.difference(common_region_df._data.index) + return common_region_df.append(aggregate_df._data[index]) diff --git a/tests/data/region_processing/dsd/region/regions.yaml b/tests/data/region_processing/dsd/region/regions.yaml index 28e01921..16508c52 100644 --- a/tests/data/region_processing/dsd/region/regions.yaml +++ b/tests/data/region_processing/dsd/region/regions.yaml @@ -1,5 +1,7 @@ - common: - World + - common_region_A + - common_region_B - model_native: - region_A - region_B diff --git a/tests/data/region_processing/dsd/variable/variables.yaml b/tests/data/region_processing/dsd/variable/variables.yaml index 39d429d7..ede21a94 100644 --- a/tests/data/region_processing/dsd/variable/variables.yaml +++ b/tests/data/region_processing/dsd/variable/variables.yaml @@ -7,3 +7,23 @@ - Share|Coal: definition: Share of Coal in the total primary energy mix unit: +- Temperature|Mean: + definition: Global mean temperature + unit: C +- Skip-Aggregation: + definition: Test variable to be skipped during aggregation + unit: EJ/yr + skip-region-aggregation: true +- Variable A: + definition: Test variable to be used for computing a max aggregate + unit: EJ/yr + region-aggregation: + - Variable A (max): + method: max +- Variable A (max): + unit: EJ/yr +- Variable B: + unit: EJ/yr + region-aggregation: + - Variable B: + method: max diff --git a/tests/data/region_processing/partial_aggregation/model_a.yaml b/tests/data/region_processing/partial_aggregation/model_a.yaml new file mode 100644 index 00000000..8a4ea676 --- /dev/null +++ b/tests/data/region_processing/partial_aggregation/model_a.yaml @@ -0,0 +1,5 @@ +model: m_a +common_regions: + - World: + - region_A + - region_B diff --git a/tests/test_core.py b/tests/test_core.py index f073716d..cb85a8bf 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -201,7 +201,7 @@ def test_region_processing_complete(directory): ), ], ) -def test_region_processing_weighted_aggregation(folder, exp_df, args): +def test_region_processing_weighted_aggregation(folder, exp_df, args, caplog): # test a weighed sum test_df = IamDataFrame( @@ -231,6 +231,13 @@ def test_region_processing_weighted_aggregation(folder, exp_df, args): ), ) assert_iamframe_equal(obs, exp) + # check the logs since the presence of args should cause a warning in the logs + if args: + logmsg = ( + "Could not aggregate 'Price|Carbon' for region 'World' " + "({'weight': 'Emissions|CO2'})" + ) + assert logmsg in caplog.text def test_region_processing_skip_aggregation(): @@ -255,3 +262,121 @@ def test_region_processing_skip_aggregation(): ), ) assert_iamframe_equal(obs, exp) + + +@pytest.mark.parametrize( + "input_data, exp_data, warning", + [ + ( # Variable is available in provided and aggregated data and the same + [ + ["m_a", "s_a", "region_A", "Primary Energy", "EJ/yr", 1, 2], + ["m_a", "s_a", "region_B", "Primary Energy", "EJ/yr", 3, 4], + ["m_a", "s_a", "World", "Primary Energy", "EJ/yr", 4, 6], + ], + [["m_a", "s_a", "World", "Primary Energy", "EJ/yr", 4, 6]], + None, + ), + ( # Variable is only available in the provided data + [ + ["m_a", "s_a", "region_A", "Primary Energy", "EJ/yr", 1, 2], + ["m_a", "s_a", "region_B", "Primary Energy", "EJ/yr", 3, 4], + ], + [["m_a", "s_a", "World", "Primary Energy", "EJ/yr", 4, 6]], + None, + ), + ( # Variable is only available in the aggregated data + [["m_a", "s_a", "World", "Primary Energy", "EJ/yr", 4, 6]], + [["m_a", "s_a", "World", "Primary Energy", "EJ/yr", 4, 6]], + None, + ), + ( # Variable is not available in all scenarios in the provided data + [ + ["m_a", "s_a", "region_A", "Primary Energy", "EJ/yr", 1, 2], + ["m_a", "s_a", "region_B", "Primary Energy", "EJ/yr", 3, 4], + ["m_a", "s_b", "region_A", "Primary Energy", "EJ/yr", 5, 6], + ["m_a", "s_b", "region_B", "Primary Energy", "EJ/yr", 7, 8], + ["m_a", "s_a", "World", "Primary Energy", "EJ/yr", 4, 6], + ], + [ + ["m_a", "s_a", "World", "Primary Energy", "EJ/yr", 4, 6], + ["m_a", "s_b", "World", "Primary Energy", "EJ/yr", 12, 14], + ], + None, + ), + ( # Using skip-aggregation: true should only take provided results + [ + ["m_a", "s_a", "region_A", "Skip-Aggregation", "EJ/yr", 1, 2], + ["m_a", "s_a", "region_B", "Skip-Aggregation", "EJ/yr", 3, 4], + ["m_a", "s_a", "World", "Skip-Aggregation", "EJ/yr", 10, 11], + ], + [["m_a", "s_a", "World", "Skip-Aggregation", "EJ/yr", 10, 11]], + None, + ), + ( # Using the region-aggregation attribute to create an additional variable + [ + ["m_a", "s_a", "region_A", "Variable A", "EJ/yr", 1, 10], + ["m_a", "s_a", "region_B", "Variable A", "EJ/yr", 10, 1], + ["m_a", "s_a", "World", "Variable A", "EJ/yr", 11, 11], + ], + [ + ["m_a", "s_a", "World", "Variable A", "EJ/yr", 11, 11], + ["m_a", "s_a", "World", "Variable A (max)", "EJ/yr", 10, 10], + ], + None, + ), + ( # Variable is available in provided and aggregated data but different + [ + ["m_a", "s_a", "region_A", "Primary Energy", "EJ/yr", 1, 2], + ["m_a", "s_a", "region_B", "Primary Energy", "EJ/yr", 3, 4], + ["m_a", "s_a", "World", "Primary Energy", "EJ/yr", 5, 6], + ], + [["m_a", "s_a", "World", "Primary Energy", "EJ/yr", 5, 6]], + [ + "Difference between original and aggregated data:", + "m_a s_a World Primary Energy", + "2005 5 4", + ], + ), + ( # Conflict between overlapping renamed variable and provided data + [ + ["m_a", "s_a", "region_A", "Variable B", "EJ/yr", 1, 2], + ["m_a", "s_a", "region_B", "Variable B", "EJ/yr", 3, 4], + ["m_a", "s_a", "World", "Variable B", "EJ/yr", 4, 6], + ], + [["m_a", "s_a", "World", "Variable B", "EJ/yr", 4, 6]], + [ + "Difference between original and aggregated data:", + "m_a s_a World Variable B EJ/yr", + "2005 4 3", + ], + ), + ], +) +def test_partial_aggregation(input_data, exp_data, warning, caplog): + # Dedicated test for partial aggregation + # Test cases are: + # * Variable is available in provided and aggregated data and the same + # * Variable is only available in the provided data + # * Variable is only available in the aggregated data + # * Variable is not available in all scenarios in the provided data + # * Using skip-aggregation: true should only take provided results + # * Using the region-aggregation attribute to create an additional variable + # * Variable is available in provided and aggregated data but different + + obs = process( + IamDataFrame(pd.DataFrame(input_data, columns=IAMC_IDX + [2005, 2010])), + DataStructureDefinition(TEST_DATA_DIR / "region_processing/dsd"), + processor=RegionProcessor.from_directory( + TEST_DATA_DIR / "region_processing/partial_aggregation" + ), + ) + exp = IamDataFrame(pd.DataFrame(exp_data, columns=IAMC_IDX + [2005, 2010])) + + # Assert that we get the expected values + assert_iamframe_equal(obs, exp) + + # Assert that we get the correct warnings + if warning is None: + assert "WARNING" not in caplog.text + else: + assert all(c in caplog.text for c in warning)