# The openLCA Result API

openLCA 2 provides a harmonized result interface that can be called from
different APIs (Java, JSON-RPC, gRPC, Rest). All these APIs call the same
backend which is implemented in the openLCA kernel. This kernel is independent
from the openLCA user interface and can be integrated as a service into other
applications.

The idea of the result interface is not to provide some ready-to-use charts and
tables but to provide all possible building blocks with which such higher level
result views can be created (charts, tables, upstream trees, Sankey diagrams).
Thus, the result interface has many methods that often look quite similar but
they have their purpose for efficiently creating higher level result views. This
notebook tries to explain what these methods do with examples using the [openLCA
Python IPC API v2](https://github.com/GreenDelta/olca-ipc.py). The calculation
is sometimes explained using [standard LCA matrix algebra](.#) but this does mean
that the implementation uses exactly these formulas when calculating results.

## Setup

In order to run the examples in this document, you need to have the olca-ipc.py
module installed and an IPC server in openLCA runnig (`Tools > Developer tools >
IPC Server`). First, make sure that the `olca-ipc` and `olca-schema` modules are
installed. We also use `pandas` and `matplotlib` for visualisations:

In [None]:
!pip list | grep 'olca\|pandas\|matplotlib\|mplcyberpunk'

Now, we import the required modules and add some boilerplate to make the
creation of data frames a bit more convenient below:

In [None]:
from typing import Any, Callable, Iterable, Literal, TypeVar

import olca_ipc as ipc
import olca_schema as lca
import olca_schema.results as res
import pandas as pd
import matplotlib.pyplot as plt
import mplcyberpunk

plt.style.use("cyberpunk")
glow = mplcyberpunk.add_glow_effects

E = TypeVar("E")


def df(
    data: Iterable[Any], *cols: tuple[str, Callable[[E], Any]]
) -> pd.DataFrame:
    columns = [c[0] for c in cols]
    transformed = []
    for row in data:
        transformed.append([col[1](row) for col in cols])
    return pd.DataFrame(transformed, columns=columns)


## Calculation and result state

### `/calculate`

In order to run a calculation, we first need to create a calculation setup. In
this setup, we define the calculation target (a process or product system) and
configure options like the LCIA method that should be used, if life cycle costs
should be calculated or not, which parameters should be redefined etc. With this
setup we can start a calculation:

In [None]:
setup = res.CalculationSetup(
    target=lca.Ref(
        model_type="ProductSystem", id="7c328e9b-d8e3-402b-a1ac-95620d021b99"
    ),
    impact_method=lca.Ref(
        model_type="ImpactMethod", id="787c02f1-d1f2-36d6-8e06-2307cc3ebebc"
    ),
)
client = ipc.Client(8080)
result = client.calculate(setup)

### `/state`

Starting the calculation directly returns a result object but this result may
isn't ready yet. This is because when we start a calculation, it is first put in
a calculation queue and there may are other calculations that are still running
until our calculation is scheduled and then, we may have to wait a bit depending
how big the system is we calculate. We can inspect the result state and could
actively wait unit the `is_ready` attribute is `True`. The result can also be an
error so we should also check this:

In [None]:
result.get_state()

We can also just use the convenience method `wait_until_ready`:

In [None]:
result.wait_until_ready()

### `/dispose`
We will call the method `dispose` at the end of this notebook but mention it
already here: a calculated result can allocate quite some resources in the
openLCA backend depending on the calculated system. If we do not need the result
anymore, we should dispose the result to free these resources especially, if we
want to run multiple calculations.

In [None]:
# we do this later
# result.dispose()

## Result elements

### `/tech-flows`

Technosphere flows are the product and waste flows of the product system by
which the processes in that system are connected. Each technosphere flow is a
pair of a provider and a flow so that the same flow could have different
providers (e.g. electricity produced by different processes). Providers are
typically processes but could also be sub-systems of the product system or even
precalculated results. The technosphere matrix $A$ is symmetrically indexed by
these $n$ technosphere flows.

$$
A \in \mathbb{R}^{n \times n}
$$

In [None]:
tech_flows = result.get_tech_flows()
df(
    tech_flows,
    ("Provider", lambda tf: tf.provider.name),
    ("Flow", lambda tf: tf.flow.name),
).head()


Results related to technosphere flows are returned as instances of the type
[TechFlowValue](http://greendelta.github.io/olca-schema/classes/TechFlowValue.html).
We map such results to data frames with the following function:

In [None]:
def tech_values(
    data: list[res.TechFlowValue],
    unit: Callable[
        [res.TechFlowValue], Any
    ] = lambda v: v.tech_flow.flow.ref_unit,
    sort: Literal["asc", "desc"] | None = None,
) -> pd.DataFrame:
    if sort:
        data.sort(key=lambda v: v.amount, reverse=sort == "desc")
    return df(
        data,
        ("Provider", lambda v: v.tech_flow.provider.name),
        ("Flow", lambda v: v.tech_flow.flow.name),
        ("Amount", lambda v: v.amount),
        ("Unit", unit),
    )


### `/envi-flows`

Intervention flows cross the boundary of the product system (the technosphere)
with its environment. The amounts of these flows form the inventory result of
the system. These are typically elementary flows but also unlinked products or
waste flows could occur here. In case of a regionalized calculation,
intervention flows can have locations attached that specify where this
intervention (crossing the boundary to the environment) happens. The values of
the $m$ intervention flows are stored in the intervention matrix $B$:

$$
B \in \mathbb{R}^{m \times n}
$$

In [None]:
envi_flows = result.get_envi_flows()
df(envi_flows,
  ("Is input?", lambda ef: ef.is_input),
  ("Flow", lambda ef: ef.flow.name),
  ("Category", lambda ef: ef.flow.category)).head()

In [None]:
def envi_values(
    data: list[res.EnviFlowValue],
    unit: Callable[
        [res.EnviFlowValue], Any
    ] = lambda v: v.envi_flow.flow.ref_unit,
    sort: Literal["asc", "desc"] | None = None,
) -> pd.DataFrame:
    if sort:
        data.sort(key=lambda v: v.amount, reverse=sort == "desc")
    return df(
        data,
        ("Flow", lambda v: v.envi_flow.flow.name),
        ("Category", lambda v: v.envi_flow.flow.category),
        ("Amount", lambda v: v.amount),
        ("Unit", unit),
    )


### `/impact-categories`

Depending on the selected impact assessment method in the calculation setup,
there ares $k$ impact categories for which an impact assessment result is
calculated. The characterization factors of these $k$ impact categories for the
$m$ flows in the system are stored in the impact matrix $C$:

$$
C \in \mathbb{R}^{k \times n}
$$

In [None]:
impact_categories = result.get_impact_categories()
df(impact_categories, ("Impact category", lambda i: i.name))

In [None]:
def impact_values(
    data: list[res.ImpactValue],
    unit: Callable[
        [res.ImpactValue], Any
    ] = lambda v: v.impact_category.ref_unit,
    sort: Literal["asc", "desc"] | None = None,
) -> pd.DataFrame:
    if sort:
        data.sort(key=lambda v: v.amount, reverse=sort == "desc")
    return df(
        data,
        ("Impact category", lambda v: v.impact_category.name),
        ("Amount", lambda v: v.amount),
        ("Unit", unit),
    )

## Technosphere flows

### `/total-requirements`

This method returns the total requirements $t$ of technosphere flows to
fulfill the demand of a product system. The amounts are given in the respective
reference units of the technosphere flows. $t$ can be calculated by scaling the
diagonal of the technosphere matrix $A$ with the scaling vector $s$:

$$
t = diag(s) * diag(A)
$$

In [None]:
t = result.get_total_requirements()
tech_values(t).head()

### `/total-requirements-of/{tech-flow}`

In [None]:
result.get_total_requirements_of(tech_flows[42])

## Inventory results

### `/total-flows`

The total inventory result $g$ contains the total amounts of the intervention
flows that cross the boundary with the environment. It can be calculated by
summing up the scaled interventions of the processes in the system:

$$
g = B * s
$$

In [None]:
g = result.get_total_flows()
envi_values(g).head()

### `/total-flow-value-of/{envi-flow}`

This method returns the total inventory result $g_i$ of a intervention flow $i$:

In [None]:
assert g[42].amount == result.get_total_flow_value_of(g[42].envi_flow)
result.get_total_flow_value_of(envi_flows[42])

### `/direct-flow-values-of/{envi-flow}`

If we want to see the direct contributions of the processes in the system to the inventory result of a flow $i$, we can scale the row $i$ of the intervention matrix $B$ with the scaling vector $s$:

$$
g_i^t = B[i, :] * diag(s)
$$


In [None]:
i = envi_flows[42]
unit = i.flow.ref_unit
gi = result.get_direct_flow_values_of(i)
plot = tech_values(gi, unit=lambda _: unit).plot(
  y="Amount", title=i.flow.name, ylabel=unit
)
glow()

### `/total-flow-values-of/{envi-flow}`

This method returns the total result values of an intervention flow for the
processes in a product system. Such a total result for a process $j$ ...

$$
M[i,:] * diagm(tf)
$$

In [None]:
i = next(filter(lambda f: "Carbon diox" in f.flow.name, envi_flows))
unit = i.flow.ref_unit
vals = result.get_total_flow_values_of(i)
tech_values(vals, unit=lambda _: unit, sort="asc").plot(
  y="Amount", title=i.flow.name, ylabel=unit
)
glow()


### `/direct-flows-of/{tech-flow}`

$$
G[:,j]
$$

In [None]:
envi_values(result.get_direct_flows_of(tech_flows[1]), sort="desc").head()

## `/direct-flow-of/{envi-flow}/{tech-flow}`

$$
G[i,j]
$$

In [None]:
result.get_direct_flow_of(envi_flows[42], tech_flows[42])

### `/total-flows-of-one/{tech-flow}`

$$
M[:,j]
$$

In [None]:
envi_values(result.get_total_flows_of_one(tech_flows[1])).head()

### `/total-flow-of-one/{envi-flow}/{tech-flow}`

$$
M[i,j]
$$

In [None]:
result.get_total_flow_of_one(envi_flows[42], tech_flows[42])

### `/total-flows-of/{tech-flow}`

$$
tf[j] * M[:,j]
$$

In [None]:
j = tech_flows[1]
values = result.get_total_flows_of(j)
envi_values(
    list(filter(lambda i: i.envi_flow.flow.ref_unit == "kg", values))
).plot(
    y="Amount",
    ylabel="kg",
    title=f"Total intervention mass flows of: {j.provider.name}",
)
glow()


### `/total-flow-of/{envi-flow}/{tech-flow}`

$$
tf[j] * M[i,j]
$$

In [None]:
result.get_total_flow_of(envi_flows[42], tech_flows[42])

### `/unscaled-flows-of/{tech-flow}`

$$
B[:,j]
$$

In [None]:
# TODO does not exist yet
envi_values(result.get_unscaled_flows_of(tech_flows[42])).head()

## Impact assessment results

### `/total-impacts`

$$
h = C * g
$$

In [None]:
impact_values(result.get_total_impacts())

### `/total-impact-value-of/{impact-category}`

$$
h[k]
$$

In [None]:
result.get_total_impact_value_of(impact_categories[0])

### `/direct-impact-values-of/{impact-category}`

$$
H[k,:]
$$

In [None]:
k = impact_categories[0]
hk = result.get_direct_impact_values_of(k)
tech_values(hk, unit=lambda _: k.ref_unit)["Amount"].plot(
    title=f"Direct impacts of: {k.name}", kind="hist",bins=4
)
glow()


### `/total-impact-values-of/{impact-category}`

$$
N[k,:] * diagm(tf)
$$

In [None]:
df(result.get_total_impact_values_of(impact_categories[0]),
  *tech_values(unit=lambda _: impact_categories[0].ref_unit)).head()

### `/direct-impacts-of/{tech-flow}`

$$
H[:,j]
$$

In [None]:
df(result.get_direct_impacts_of(tech_flows[1]),
  ("Impact category", lambda v: v.impact_category.name),
  ("Amount", lambda v: v.amount),
  ("Unit", lambda v: v.impact_category.ref_unit))

### `/direct-impact-of/{impact-category}/{tech-flow}`

$$
H[k,j]
$$

In [None]:
result.get_direct_impact_of(impact_categories[0], tech_flows[1])

### `/total-impacts-of-one/{tech-flow}`

$$
N[:,j]
$$

In [None]:
df(result.get_total_impacts_of_one(tech_flows[1]),
  ("Impact category", lambda v: v.impact_category.name),
  ("Amount", lambda v: v.amount),
  ("Unit", lambda v: v.impact_category.ref_unit))

### `/total-impact-of-one/{impact-category}/{tech-flow}`

$$
N[k,j]
$$

In [None]:
result.get_total_impact_of_one(impact_categories[0], tech_flows[1])

### `/total-impacts-of/{tech-flow}`

$$
tf[j] * N[:,j]
$$

In [None]:
df(result.get_total_impacts_of(tech_flows[1]),
  ("Impact category", lambda v: v.impact_category.name),
  ("Amount", lambda v: v.amount),
  ("Unit", lambda v: v.impact_category.ref_unit))

### `/total-impact-of/{impact-category}/{tech-flow}`

$$
tf[j] * N[k,j]
$$

In [None]:
result.get_total_impact_of(impact_categories[0], tech_flows[1])

### `/impact-factors-of/{impact-category}`

$$
C[k,:]
$$