# Example for life cycle assessment module of AeroMAPS


## Load and process

First, the user has to load the framework and generate a process.

In [1]:
%matplotlib widget
import pandas as pd
from aeromaps import create_process
from aeromaps.core.models import (
    models_traffic,
    models_efficiency_bottom_up,
    models_energy_without_fuel_effect,
    models_offset,
    models_climate_simple_gwpstar,
    models_sustainability,
)
from aeromaps.models.air_transport.aircraft_fleet_and_operations.fleet.fleet_numeric import FleetEvolution
from aeromaps.models.impacts.life_cycle_assessment.life_cycle_assessment import LifeCycleAssessment

In [None]:
extended_models = {
    "models_traffic": models_traffic,
    "models_efficiency_bottom_up": models_efficiency_bottom_up,
    "models_energy_without_fuel_effect": models_energy_without_fuel_effect,
    "models_offset": models_offset,
    "models_climate_simple_gwpstar": models_climate_simple_gwpstar,
    "models_sustainability": models_sustainability,
    "fleet_numeric": FleetEvolution("fleet_numeric"),
    "life_cycle_assessment": LifeCycleAssessment(
        "life_cycle_assessment",
        reset=False,
    )
}

In [None]:
process = create_process(
    configuration_file="data_files/config.json",
    models=extended_models,
    use_fleet_model=True,
    add_examples_aircraft_and_subcategory=True
)

In [None]:
#process.list_float_inputs()

## Input parameters

ReFuelEU

In [None]:
## Share of alternative fuels in the drop-in fuel mix (the rest being supplemented by kerosene) [%]
process.parameters.biofuel_share_reference_years = [2020, 2025, 2030, 2035, 2040, 2045, 2050]
process.parameters.biofuel_share_reference_years_values = [0, 2, 4.8, 15, 24, 27, 35]
process.parameters.electrofuel_share_reference_years = [2020, 2025, 2030, 2035, 2040, 2045, 2050]
process.parameters.electrofuel_share_reference_years_values = [0, 0, 1.2, 5, 10, 15, 35]

## Compute

Once all the parameters have been set up, the user can compute.

In [7]:
import time
start_time = time.time()
process.compute()
print("--- %s seconds ---" % (time.time() - start_time))

## Results

The user can then display the results. The user has access to float outputs but also to annual data outputs, with the possibility of choosing the output.

In [21]:
#process.data['vector_outputs']

In [22]:
#process.climate_outputs_df

In [8]:
process.lca_outputs_df.T

## Plots

In [9]:
import matplotlib.pyplot as plt
import seaborn as sns
import math
plt.style.use('ggplot')

def plot_stacked_evolution_subplots(df):
    # Remove phases containing 'sum'
    df_filtered = df[~df.index.get_level_values('phase').str.contains('sum')]
    
    methods = df_filtered.index.get_level_values('method').unique()
    years = df_filtered.columns
    
    # Determine the number of rows and columns for the subplots
    n_methods = len(methods)
    n_cols = 2 if n_methods % 2 == 0 else 3
    n_rows = math.ceil(n_methods / n_cols)
    
    # Use seaborn color palette for better aesthetics
    palette = sns.color_palette("Set2", len(df_filtered.index.levels[1]))
    
    # Create subplots
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(12, n_rows * 4), constrained_layout=True)
    axes = axes.flatten()  # Flatten the array of axes for easy iteration

    for i, method in enumerate(methods):
        df_method = df_filtered.xs(method, level='method')
        #df_method.index = df_method.index.str.replace('_', ' ')
        
        # Plot stacked area chart with custom colors
        axes[i].stackplot(years, df_method, labels=df_method.index, alpha=0.8, colors=palette)
        
        # Customize the subplot
        name, unit = method.split('[', 1)
        name = name.replace('- ', '\n').replace('(', '\n(')
        name = "".join([a if a.isupper() else b for a,b in zip(name,name.title())])
        unit = unit.replace(']', '')
        axes[i].set_title(name, fontsize=12)
        axes[i].set_xlabel('Year')
        axes[i].set_ylabel(unit)
        axes[i].grid(True)
        if i == 0:
            axes[i].legend()
    
    # Remove any empty subplots
    #for j in range(i + 1, len(axes)):
    #    fig.delaxes(axes[j])
    
    # Add a single legend for all subplots
    #handles, labels = axes[0].get_legend_handles_labels()
    #fig.legend(handles, labels, title='Phase', loc='upper center', ncol=4, bbox_to_anchor = (0, 0.05, 1, 1), mode="expand") #, ncol=len(df_filtered.index.levels[1]))
        
    plt.show()

# Call the function
plot_stacked_evolution_subplots(process.lca_outputs_df)

# Draft

In [None]:
from functools import wraps

def kwargs_decorator():
    def wrapper(f):
        @wraps(f)
        def inner_wrapper(*args, **kwargs):
            dict_kwargs = args[0].lca_params
            new_kwargs = {**dict_kwargs, **kwargs}
            return f(*args, **new_kwargs)
        return inner_wrapper
    return wrapper

class Test:
    def __init__(self):
        self.lca_params = {'a': 1.0}
        
    @kwargs_decorator()
    def foo(self, **kwargs):
        print(kwargs)

In [None]:
test = Test()
test.foo()

In [None]:
from functools import wraps
import lca_algebraic as agb

def kwargs_decorator(dict_kwargs):
    def wrapper(f):
        @wraps(f)
        def inner_wrapper(*args, **kwargs):
            # dict_kwargs = args[0].my_dict  # args[0] refer to 'self'
            new_kwargs = {**dict_kwargs, **kwargs}
            return f(*args, **new_kwargs)
        return inner_wrapper
    return wrapper


def args_decorator(list_args):
    def wrapper(f):
        @wraps(f)
        def inner_wrapper(*args, **kwargs):
            # dict_kwargs = args[0].my_dict  # args[0] refer to 'self'
            new_args = {*args, *list_args}
            return f(*new_args, **kwargs)
        return inner_wrapper
    return wrapper


class Test:
        
    @args_decorator(agb.all_params().keys())
    def foo(self, *args, **kwargs):
        return args
    
    
test = Test()
test.__init__()
res = test.foo()
res

In [None]:
from inspect import getfullargspec

getfullargspec(test.foo)

In [None]:
list(agb.all_params().keys())

In [None]:
class Test:
    def init(self):
        self.my_list = ['a', 'b']
    
    def foo(self, *args, **kwargs):
        
        
        def foo2(
        
        return args
    

In [None]:
test = ['q', 1.0]

any(isinstance(elem, str) for elem in test)

In [99]:
agb.findMethods?

In [110]:
import lca_algebraic as agb

agb.findMethods('',mainCat='ReCiPe 2016 v1.03, midpoint (H)')

In [3]:
import brightway2 as bw
from dotenv import load_dotenv
import bw2io
import bw2data
import os
import lca_algebraic as agb

bw2data.projects.set_current('example_methodology_ei391')
db = bw.Database('ecoinvent_cutoff_3.9_remind_SSP2-Base_2020')
#db = bw.Database('biosphere3')

In [10]:
[act for act in db if 'Carbon dioxide' in act['name']]

In [10]:
[act for act in db if 'kerosene production, via Fischer-Tropsch' in act['name']]

In [8]:
act

In [12]:
act.as_dict()

In [None]:
import lca_algebraic as agb
import pandas as pd
from sympy.parsing.sympy_parser import parse_expr
from sympy import Float
import bw2data
from typing import List, Union

USER_DB = 'Foreground DB'
DEFAULT_PROJECT = 'lcav_default_project'


def list_processes(model, foreground_only: bool = True, custom_attribute: str = None) -> pd.DataFrame:
    """
    Traverses the tree of sub-activities (sub-processes) until background database is reached.
    """

    def _recursive_activities(act,
                              activities, units, locations, parents, exchanges, levels, dbs, custom_attributes,
                              parent: str = "", exc: dict = None, level: int = 0):

        if exc is None:
            exc = {}
        name = act.as_dict()['name']
        unit = act.as_dict()['unit']
        loc = act.as_dict().get('location', "")
        if loc not in ['GLO', '']:
            name += f' [{loc}]'
        exchange = _getAmountOrFormula(exc)
        db = act.as_dict()['database']
        custom_attr = act.as_dict().get(custom_attribute, "")  # get any additional attribute asked by the user

        # Stop BEFORE reaching the first level of background activities
        if foreground_only and db != USER_DB:
            return

        activities.append(name)
        units.append(unit)
        locations.append(loc)
        parents.append(parent)
        exchanges.append(exchange)
        levels.append(level)
        dbs.append(db)
        custom_attributes.append(custom_attr)

        # Stop AFTER reaching the first level of background activities
        if db != USER_DB:
            return

        for exc in act.exchanges():
            if exc.input != act:
                _recursive_activities(exc.input, activities, units, locations, parents, exchanges, levels, dbs, custom_attributes,
                                      parent=name,
                                      exc=exc,
                                      level=level + 1)
        return

    # Initialize lists
    activities = []
    units = []
    locations = []
    parents = []
    exchanges = []
    levels = []
    dbs = []
    custom_attributes = []

    # Recursively populate lists
    _recursive_activities(model, activities, units, locations, parents, exchanges, levels, dbs, custom_attributes)
    data = {'activity': activities,
            'unit': units,
            'location': locations,
            'level': levels,
            'database': dbs,
            'parent': parents,
            'exchange': exchanges,
            }
    if custom_attribute:
        data[custom_attribute] = custom_attributes

    # Create DataFrame
    df = pd.DataFrame(data, index=activities)

    return df


def get_parameter(key: str):
    param = agb.params._param_registry().__getitem__(key)
    return param


def expandParams(param, value=None):
    """
    Modified version of expandParams from classes EnumParam and ParamDef from lca_algebraic library.
    For enum (switch) parameters, returns a dictionary of single enum values as sympy symbols,
    with only a single one set to 1.
    For float parameters, returns a dictionary with either the user-provided value or the default parameter value.
    """

    # Enum (e.g. switch) parameters
    if param['type'] == 'enum':
        values = param['values'] + [None]

        # Bad value ?
        if value not in values:
            raise Exception("Invalid value %s for param %s. Should be in %s" %
                            (value, param['name'], str(param['values'])))

        res = dict()
        for enum_val in values:
            var_name = "%s_%s" % (param['name'], enum_val if enum_val is not None else "default")
            res[var_name] = 1.0 if enum_val == value else 0.0
        return res

    # Float parameters
    else:
        if value is None:
            value = param['default']
        return {param['name']: value}


def completeParamValues(params, param_registry, setDefaults=True):
    """
    Modified version of completeParamValues from lca_algebraic library.
    Sets default values for missing parameters and expand enum params.

    Returns
    -------
        Dict of param_name => float value
    """

    # Set default variables for missing values
    if setDefaults:
        for name, param in param_registry.items():
            if not name in params:
                params[name] = param['default']
                agb.warn(
                    "Required param '%s' was missing, replacing by default value : %s" % (name, str(param['default'])))

    res = dict()
    for key, val in params.items():
        if key in param_registry:
            param = param_registry[key]
        else:
            continue
            # raise Exception("Parameter not found : %s. Valid parameters : %s" % (key, list(param_registry.keys())))

        if isinstance(val, list):
            newvals = [expandParams(param, val) for val in val]
            res.update(agb.params._listOfDictToDictOflist(newvals))
        else:
            res.update(expandParams(param, val))

    return res


def format_number(num, precision=2):
    """
    This function takes a number (either a Python float or a SymPy Float) and a precision,
    and returns a string that represents the number with the given precision.
    If the number is too large or too small, it switches to scientific notation.

    :param num: The number to format.
    :param precision: The number of decimal places to use.
    :return: A string that represents the number with the given precision.
    """
    if isinstance(num, Float):  # SymPy float
        num = num.evalf()
    sci_num = "{:.{}e}".format(num, precision)
    if 'e+00' in sci_num:
        return "{:.{}f}".format(num, precision)
    else:
        return sci_num


def _getAmountOrFormula(ex):
    """ Return either a fixed float value or an expression for the amount of this exchange"""
    if 'formula' in ex:
        expr = parse_expr(ex['formula'])
        try:
            float(expr)
            return format_number(float(expr))
        except TypeError:
            return expr
    elif 'amount' in ex:
        return format_number(ex['amount'])
    return ""


def safe_delete_brightway_project(projectname: str) -> None:
    try:
        bw2data.projects.delete_project(
            name = projectname,
            delete_dir = True
        )
    except:
        pass

In [None]:
model = agb.findActivity('model', db_name='Foreground DB')

In [None]:
list_processes(model)