# Example 4 - AGGREGATION

This example builds on example 3, and demonstrates how to do aggregations on the data

In [1]:
# This is the required code from example 3 to get the same data structures

from pathlib import Path
from aqua_tracekit import SdtModel, SdtSchema, Aggregation
from datetime import datetime
import polars as pl

base_path = Path("../example_3_mix/data")
model = SdtModel(base_path=str(base_path.resolve()))

df_containers = model.load_containers("containers.csv")
df_populations = model.load_populations("populations.csv")
df_transfers = model.load_transfers("transfers.csv")


df_cage_o2 = model.load_container_timeseries("o2.csv")
df_cage_o2 = SdtModel.parse_float(df_cage_o2, "O2_mg_per_litre")
df_pop_o2 = model.map_container_data_to_populations(df_cage_o2, include_unmatched=False, allow_multiple=False)

trace_time = datetime(2025,9,10,16,0,0)
df_origin_populations = model.get_populations_active_at(trace_time)
df_traceability_index = model.trace_populations(df_origin_populations)
df_traced_data = model.add_data_to_trace(df_pop_o2, df_traceability_index)

## Aggregating functions
In example 3, we used a simple ´.pivot_table´ to group and aggregate. Default aggregation in .pivot_table is mean() for the values. For some use-cases, this will be fine, but normally 
we use slighlty more sophisticated aggregation functions.

The model has a few built-in aggregation functions

The first example will be an aggregate function that will do a MIN() of the values:


In [2]:
aggs = [
    Aggregation.min("O2_mg_per_litre")
]
df_grouped_data = model.aggregate_traced_data(df_traced_data, aggs)
df_grouped_data.sort(SdtSchema.TimeSeries.DATE_TIME)

df_grouped_data.head(11)

origin_population_id,date_time,O2_mg_per_litre_min
str,datetime[μs],f64
"""P3""",2025-09-07 16:00:00,4.0
"""P3""",2025-09-03 16:00:00,4.0
"""P3""",2025-09-09 16:00:00,4.0
"""P3""",2025-09-06 16:00:00,5.0
"""P3""",2025-09-08 16:00:00,5.0
"""P3""",2025-09-02 16:00:00,5.0
"""P3""",2025-09-04 16:00:00,5.0
"""P3""",2025-09-05 16:00:00,4.0
"""P3""",2025-09-01 16:00:00,4.0
"""P3""",2025-09-10 16:00:00,5.0


this will return correctly 4 as the minimum O2 on 2025-09-03. We can do the same for max:

In [3]:
aggs = [
    Aggregation.max("O2_mg_per_litre")
]
df_grouped_data = model.aggregate_traced_data(df_traced_data, aggs)
df_grouped_data.sort(SdtSchema.TimeSeries.DATE_TIME)

df_grouped_data.head(11)

origin_population_id,date_time,O2_mg_per_litre_max
str,datetime[μs],f64
"""P3""",2025-09-03 16:00:00,11.0
"""P3""",2025-09-04 16:00:00,9.0
"""P3""",2025-09-10 16:00:00,5.0
"""P3""",2025-09-06 16:00:00,5.0
"""P3""",2025-09-02 16:00:00,5.0
"""P3""",2025-09-09 16:00:00,4.0
"""P3""",2025-09-08 16:00:00,5.0
"""P3""",2025-09-01 16:00:00,4.0
"""P3""",2025-09-07 16:00:00,4.0
"""P3""",2025-09-05 16:00:00,4.0


Or combine both:

In [4]:
aggs = [
    Aggregation.min("O2_mg_per_litre"),
    Aggregation.max("O2_mg_per_litre")
]
df_grouped_data = model.aggregate_traced_data(df_traced_data, aggs)
df_grouped_data.sort(SdtSchema.TimeSeries.DATE_TIME)

df_grouped_data.head(11)

origin_population_id,date_time,O2_mg_per_litre_min,O2_mg_per_litre_max
str,datetime[μs],f64,f64
"""P3""",2025-09-04 16:00:00,5.0,9.0
"""P3""",2025-09-06 16:00:00,5.0,5.0
"""P3""",2025-09-01 16:00:00,4.0,4.0
"""P3""",2025-09-03 16:00:00,4.0,11.0
"""P3""",2025-09-10 16:00:00,5.0,5.0
"""P3""",2025-09-08 16:00:00,5.0,5.0
"""P3""",2025-09-07 16:00:00,4.0,4.0
"""P3""",2025-09-09 16:00:00,4.0,4.0
"""P3""",2025-09-02 16:00:00,5.0,5.0
"""P3""",2025-09-05 16:00:00,4.0,4.0


Weighted average is often used. In our model, this is simple to achieve, as the share(s) are directly expressed in the traceability_matrix.

Here is an example using a weighted average based on number of fish:


In [5]:
aggs = [
    Aggregation.weighted_avg("O2_mg_per_litre", SdtSchema.AGGREGATE_BY.COUNT)
]
df_grouped_data = model.aggregate_traced_data(df_traced_data, aggs)
df_grouped_data = df_grouped_data.sort(SdtSchema.TimeSeries.DATE_TIME)

df_grouped_data.head(11)


origin_population_id,date_time,O2_mg_per_litre_weighted_avg
str,datetime[μs],f64
"""P3""",2025-09-01 16:00:00,4.0
"""P3""",2025-09-02 16:00:00,5.0
"""P3""",2025-09-03 16:00:00,10.3
"""P3""",2025-09-04 16:00:00,8.6
"""P3""",2025-09-05 16:00:00,4.0
"""P3""",2025-09-06 16:00:00,5.0
"""P3""",2025-09-07 16:00:00,4.0
"""P3""",2025-09-08 16:00:00,5.0
"""P3""",2025-09-09 16:00:00,4.0
"""P3""",2025-09-10 16:00:00,5.0


We can also supply custom aggregation functions in python. 

In this example information about the underlying populations and shares is conatenated into a text.0

In [6]:
def my_agg(group: pl.DataFrame) -> dict:
    rows = group.select(["container_id", "traced_population_id", "share_count_backward"])
    parts = [f"{r['container_id']}:{r['traced_population_id']}:{r['share_count_backward']}" 
             for r in rows.iter_rows(named=True)]
    return {"contribution": ", ".join(parts)}

aggs = [
    Aggregation.weighted_avg("O2_mg_per_litre", aggregate_by=SdtSchema.AGGREGATE_BY.COUNT),
    Aggregation.custom(my_agg),
]
result = SdtModel.aggregate_traced_data(df_traced_data, aggs)
result = result.sort(SdtSchema.TimeSeries.DATE_TIME)
with pl.Config(tbl_rows=100):
    print(result)

shape: (10, 4)
┌──────────────────────┬─────────────────────┬──────────────────────────────┬──────────────────────┐
│ origin_population_id ┆ date_time           ┆ O2_mg_per_litre_weighted_avg ┆ contribution         │
│ ---                  ┆ ---                 ┆ ---                          ┆ ---                  │
│ str                  ┆ datetime[μs]        ┆ f64                          ┆ str                  │
╞══════════════════════╪═════════════════════╪══════════════════════════════╪══════════════════════╡
│ P3                   ┆ 2025-09-01 16:00:00 ┆ 4.0                          ┆ C1:P1:0.1            │
│ P3                   ┆ 2025-09-02 16:00:00 ┆ 5.0                          ┆ C1:P1:0.1            │
│ P3                   ┆ 2025-09-03 16:00:00 ┆ 10.3                         ┆ C1:P1:0.1, C2:P2:0.9 │
│ P3                   ┆ 2025-09-04 16:00:00 ┆ 8.6                          ┆ C1:P1:0.1, C2:P2:0.9 │
│ P3                   ┆ 2025-09-05 16:00:00 ┆ 4.0                          

Note that built-in functions have better performance:

In [7]:
aggs = [
    Aggregation.weighted_avg("O2_mg_per_litre", aggregate_by=SdtSchema.AGGREGATE_BY.COUNT),
    Aggregation.contribution_breakdown([SdtSchema.Container.CONTAINER_ID, SdtSchema.TraceabilityIndex.TRACED_POPULATION_ID, SdtSchema.TraceabilityIndex.FACTORS.SHARE_COUNT_BACKWARD])
]
result = SdtModel.aggregate_traced_data(df_traced_data, aggs)
result.sort(SdtSchema.TimeSeries.DATE_TIME)
with pl.Config(tbl_rows=100):
    print(result)

shape: (10, 4)
┌──────────────────────┬─────────────────────┬────────────────────────────┬────────────────────────┐
│ origin_population_id ┆ date_time           ┆ O2_mg_per_litre_weighted_a ┆ contribution_breakdown │
│ ---                  ┆ ---                 ┆ vg                         ┆ ---                    │
│ str                  ┆ datetime[μs]        ┆ ---                        ┆ str                    │
│                      ┆                     ┆ f64                        ┆                        │
╞══════════════════════╪═════════════════════╪════════════════════════════╪════════════════════════╡
│ P3                   ┆ 2025-09-03 16:00:00 ┆ 10.3                       ┆ C1:P1:0.1, C2:P2:0.9   │
│ P3                   ┆ 2025-09-08 16:00:00 ┆ 5.0                        ┆ C1:P3:1.0              │
│ P3                   ┆ 2025-09-06 16:00:00 ┆ 5.0                        ┆ C1:P3:1.0              │
│ P3                   ┆ 2025-09-05 16:00:00 ┆ 4.0                        ┆ 

We can also group by other columns, as long as the aggregation functions can handle it. 

Lets aggregate data by adding a week column and group by origin_node and week. 

In this case; min and max aggregation functions, this will work as expected. 

But note that this is an aggregation over time as well as over populations. We can quickly imagine scenarioes where this would produce unexpected results, depending on underlying data and chosen aggregation functions.

When aggregating over time and population - you might want to determine wether to fill gaps and reduce values to matching resolution across populations, and maybe also use time-aware aggregation functions.

In [8]:
# add week column if not done already
week_col = "week_number"

df_traced_data_week = df_traced_data.with_columns(
    pl.col(SdtSchema.TimeSeries.DATE_TIME).dt.week().alias(week_col)
)

# aggregation min and max
aggs =[
        Aggregation.min("O2_mg_per_litre"),
        Aggregation.max("O2_mg_per_litre")
    ]

# group by week number and origin node
df_agg_data = model.aggregate_traced_data(df_traced_data_week, aggs, [SdtSchema.TraceabilityIndex.ORIGIN_POPULATION_ID, week_col])
df_agg_data.head(11)

origin_population_id,week_number,O2_mg_per_litre_min,O2_mg_per_litre_max
str,i8,f64,f64
"""P3""",37,4.0,5.0
"""P3""",36,4.0,11.0
