# Writing a custom aggregator

In this demo, we have a look on how to write a new aggregator and include it into enbios.

We actually are not gonna write a whole new Aggregator class from scratch, but create one derrived from the already existing SumAggregator class.
If you are not familiar with the concept of class inheritence, you can read about it here [python class inheritance](https://docs.python.org/3/tutorial/classes.html#inheritance) or with a [simple explanation here](https://pythonbasics.org/inheritance/).

In our example that means that our new aggregator (ThresholdAggregator) will behave in exactly the same way as the SumAggregator, but except when we define it to do otherwise.

In this example we create an aggregator that behaves exactly like the SumAggreagator, but for nodes using this aggregator, we can define specific thresholds for its methods, and the result-data will include boolean values, which indicate if the threshold is exceeded or not. Therefor we do not change the results in any way but augment it with additional information.

Since each new aggregator (and adapter) that we want to add to enbios we neet to provide the path to the module (python file) containing the aggregator class. You can find the `threshold_aggregator.py` file in the `data` directory of this demo folder of this repository.

In [1]:
from pathlib import Path

import pandas as pd

module_path = Path() / "data/threshold_aggregator.py"
module_path.exists()

True

Let's begin by defining a simple experiment config. We are using the assignment adapter, so that the calculation of the results takes no additional time. 

In [2]:
from enbios import Experiment

exp_config = {
    "adapters": [{"adapter_name": "assignment-adapter", "methods": {"co2": "kg"}}],
    "hierarchy": {
        "name": "root",
        "aggregator": "sum",
        "children": [
            {"name": "n1", "adapter": "assign", "config": {"outputs": [{"unit": "kg"}]}},
            {"name": "n2", "adapter": "assign", "config": {"outputs": [{"unit": "l"}]}},
        ],
    },
    "scenarios": [
        {
            "name": "scenario1",
            "nodes": {
                "n1": {
                    "outputs": [{"magnitude": 100}],
                    "impacts": {"co2": {"unit": "kg", "magnitude": 100}},
                },
                "n2": {
                    "outputs": [{"magnitude": 100}],
                    "impacts": {"co2": {"unit": "kg", "magnitude": 100}},
                },
            },
        }
    ],
}

Experiment(exp_config).run()

2024-05-13 11:04:25,954 - demos.enbios.base - INFO - Running scenario 'scenario1'


{'scenario1': {'name': 'root',
  'results': {'co2': {'unit': 'kg', 'magnitude': 200.0}},
  'output': [{'unit': 'kilogram', 'magnitude': 100.0, 'label': None},
   {'unit': 'liter', 'magnitude': 100.0, 'label': None}],
  'children': [{'name': 'n1',
    'results': {'co2': {'unit': 'kg', 'magnitude': 100.0}},
    'output': [{'unit': 'kg', 'magnitude': 100.0, 'label': None}]},
   {'name': 'n2',
    'results': {'co2': {'unit': 'kg', 'magnitude': 100.0}},
    'output': [{'unit': 'l', 'magnitude': 100.0, 'label': None}]}]}}

Now, we add one aggregator to our experiment config and point to the module path of our new aggregator. Rerunning this, will not change anything. The aggregator is integrated into the config, and loaded, but no node makes use of it.

In [3]:
path_string = module_path.as_posix()

exp_config["aggregators"] = [
    {"aggregator_name": "sum-threshold", "module_path": path_string}
]

exp = Experiment(exp_config)
exp.run()

2024-05-13 11:04:25,969 - demos.enbios.base - INFO - Running scenario 'scenario1'


{'scenario1': {'name': 'root',
  'results': {'co2': {'unit': 'kg', 'magnitude': 200.0}},
  'output': [{'unit': 'kilogram', 'magnitude': 100.0, 'label': None},
   {'unit': 'liter', 'magnitude': 100.0, 'label': None}],
  'children': [{'name': 'n1',
    'results': {'co2': {'unit': 'kg', 'magnitude': 100.0}},
    'output': [{'unit': 'kg', 'magnitude': 100.0, 'label': None}]},
   {'name': 'n2',
    'results': {'co2': {'unit': 'kg', 'magnitude': 100.0}},
    'output': [{'unit': 'l', 'magnitude': 100.0, 'label': None}]}]}}

Now, let's add the aggregator to the root node and specify a threshold for the `co2` method.

In [4]:
exp_config["hierarchy"]["aggregator"] = "threshold"
exp_config["hierarchy"]["config"] = {
    "method_thresholds": [{"method": "co2", "threshold": 300}]
}

exp = Experiment(exp_config)
result = exp.run()
result

2024-05-13 11:04:25,981 - demos.enbios.base - INFO - Running scenario 'scenario1'


{'scenario1': {'name': 'root',
  'results': {'co2': {'unit': 'kg', 'magnitude': 200.0}},
  'output': [{'unit': 'kilogram', 'magnitude': 100.0, 'label': None},
   {'unit': 'liter', 'magnitude': 100.0, 'label': None}],
  'threshold_results': {'co2': False},
  'children': [{'name': 'n1',
    'results': {'co2': {'unit': 'kg', 'magnitude': 100.0}},
    'output': [{'unit': 'kg', 'magnitude': 100.0, 'label': None}]},
   {'name': 'n2',
    'results': {'co2': {'unit': 'kg', 'magnitude': 100.0}},
    'output': [{'unit': 'l', 'magnitude': 100.0, 'label': None}]}]}}

In [5]:
result["scenario1"]["threshold_results"]

{'co2': False}

In [6]:
exp.results_to_csv("temp.csv", flat_hierarchy=True)

pd.read_csv("temp.csv").fillna("")

Unnamed: 0,node_name,level,parent_name,results_co2_unit,results_co2_magnitude,output_0_unit,output_0_magnitude,output_0_label,output_1_unit,output_1_magnitude,output_1_label,threshold_results_co2
0,root,0,,kg,200.0,kilogram,100.0,,liter,100.0,,False
1,n1,1,root,kg,100.0,kg,100.0,,,,,
2,n2,1,root,kg,100.0,l,100.0,,,,,


In [7]:
# this is the whole module content:

from typing import Any

from pydantic import BaseModel, Field

from enbios import BasicTreeNode, ScenarioResultNodeData, ResultValue
from enbios.base.adapters_aggregators.builtin import SumAggregator


class MethodThreshold(BaseModel):
    method: str
    threshold: float


class NodeThresholdConfig(BaseModel):
    method_thresholds: list[MethodThreshold] = Field(default_factory=list)


class ThresholdAggregator(SumAggregator):
    def __init__(self):
        super().__init__()
        self.node_thresholds: dict[str, NodeThresholdConfig] = {}
        self.threshold_results: dict[str, dict[str, bool]] = {}

    def validate_node(self, node_name: str, node_config: Any):
        if node_config:
            self.node_thresholds[node_name] = NodeThresholdConfig.model_validate(
                node_config
            )

    def name(self) -> str:
        return "sum-threshold-aggregator"

    def node_indicator(self) -> str:
        return "threshold"

    def aggregate_node_result(
        self, node: BasicTreeNode[ScenarioResultNodeData], scenario_name: str
    ) -> dict[str, ResultValue]:
        sum_ = super().aggregate_node_result(node, scenario_name)
        if node.name in self.node_thresholds:
            node_thresholds = self.node_thresholds[node.name]
            self.threshold_results[node.name] = {}
            for method_threshold in node_thresholds.method_thresholds:
                if method_threshold.method in sum_:
                    method = method_threshold.method
                    self.threshold_results[node.name][method] = (
                        sum_[method].magnitude >= method_threshold.threshold
                    )
        return sum_

    def result_extras(self, node_name: str, scenario_name: str) -> dict[str, Any]:
        results = self.threshold_results.get(node_name, {})
        if results:
            return {"threshold_results": results}
        else:
            return {}

The aggregator itself requires no specific configuration. However, we need the aggregator to store the configs for the nodes, where we want to use thresholds. To guarantee the node-config valdity, we add two model classes (pydantic BaseModels) which will take care of the validation for us. Each nodes config can have a list of `MethodThreshold` objects (which will be created from the dictionaries we pass). 

In the `validate_node` method, we store the config in a dictionary, where the key is the nodes name and the config is the `NodeThresholdConfig` object.

In the constructor we also create a dictionary (`threshold_results`), where we will store the results of the threshold checks.
The most crucial function that we overwrite is `aggregate_node_result`. However, the first thing we do in here, is to call `super().aggregate_node_result(node)` that means we calculate the result aggregation of the node, with the sum aggregator. Afterward, if the node specifies any thresholds in its configs, we iterate and check those and assign the results to the `threshold_results` dictionary.

Finally, the `result_extras`, we return the results for each node in the `threshold_results` dictionary.




In [8]:
exp_config["scenarios"].append(
    {
        "name": "scenario2",
        "nodes": {
            "n1": {
                "outputs": [{"magnitude": 190}],
                "impacts": {"co2": {"unit": "kg", "magnitude": 200}},
            },
            "n2": {
                "outputs": [{"magnitude": 300}],
                "impacts": {"co2": {"unit": "kg", "magnitude": 500}},
            },
        },
    }
)

In [9]:
# running the epxperiment again with the second scenario included, we see that in the case of the second scenario, the threshold is surpassed and the threshold value is marked True .
exp = Experiment(exp_config)
exp.run()

exp.results_to_csv(
    "temp.csv", flat_hierarchy=True, include_output=False, include_method_units=False
)

pd.read_csv("temp.csv").fillna("")

2024-05-13 11:04:26,042 - demos.enbios.base - INFO - Running scenario 'scenario1'
2024-05-13 11:04:26,043 - demos.enbios.base - INFO - Running scenario 'scenario2'


Unnamed: 0,scenario,node_name,level,parent_name,results_co2_magnitude,threshold_results_co2
0,scenario1,root,0,,200.0,False
1,scenario1,n1,1,root,100.0,
2,scenario1,n2,1,root,100.0,
3,scenario2,root,0,,700.0,True
4,scenario2,n1,1,root,200.0,
5,scenario2,n2,1,root,500.0,


Following we make some adjustments to our experiment. We use a modified version of the threshold-aggregotor. We extend it, in such a way, that the threshold can be specified for each scenario. A scenario config for a node using the scenario-sum-threshold aggregator can pass the same threshold config as in the hierarchy. The config in the hierarchy will be used as a default, but if specified in a scenario config, a node will use those configs. We can now pass a new config for the root node for `scenario2`, where we specify a higher threshold for co2.

In [13]:
exp_config["aggregators"][0] = {
    "aggregator_name": "scenario-sum-threshold",
    "module_path": Path() / "data/threshold_aggregator_scenarios.py",
}

exp_config["hierarchy"]["aggregator"] = "scenario-threshold"

exp_config["scenarios"][1]["nodes"]["root"] = {
    "method_thresholds": [{"method": "co2", "threshold": 1000}]
}

exp = Experiment(exp_config)
exp.run()

exp.results_to_csv(
    "temp.csv", flat_hierarchy=True, include_output=False, include_method_units=False
)
pd.read_csv("temp.csv").fillna("")

2024-05-13 11:15:46,598 - demos.enbios.base - INFO - Running scenario 'scenario1'
2024-05-13 11:15:46,605 - demos.enbios.base - INFO - Running scenario 'scenario2'


Unnamed: 0,scenario,node_name,level,parent_name,results_co2_magnitude,threshold_results_co2
0,scenario1,root,0,,200.0,False
1,scenario1,n1,1,root,100.0,
2,scenario1,n2,1,root,100.0,
3,scenario2,root,0,,700.0,False
4,scenario2,n1,1,root,200.0,
5,scenario2,n2,1,root,500.0,
