# Uncertainty and global sensitivity analysis of LCA

Author: [Aleksandra Kim](https://github.com/aleksandra-kim/)

Brightway 2.5 Autumn school

Grosshöchstetten, Switzerland, October 2022

# Global sensitivity analysis of LCA

<img src="input_data/gsa.png" width=1600 />

# LCA case study

- Environmental impacts of lithium carbonate production from Chaerhan brines. 
- Foreground inventories from [Schenker et al.](https://doi.org/10.1016/j.resconrec.2022.106611)
- Uncertainty quantification in inputs is provided.
- Numerically uncertainty propagation with Monte Carlo.
- We will conduct uncertainty analysis ...
- ... as well as global sensitivity analysis.
- But GSA only for the foreground system in the interest of time.

# Flowchart for the foreground inventories

<img src="input_data/lithium_lci.png" width=800 />

In [None]:
# === Switch to kernel bw25! ===

# Brightway libraries
import bw2data as bd
import bw2io as bi
import bw2calc as bc
import bw_processing as bwp

# General libraries
import numpy as np
import pandas as pd
from fs.zipfs import ZipFS
import json                        # Library for working with json files
from pathlib import Path           # Library for working with paths in different OS     
import matplotlib.pyplot as plt    # Library for creating plots

# 1. Setup BW project and databases

In [None]:
# Let's first create directories where we will be saving results
output_dir = Path(f"output_data")
output_dir.mkdir(exist_ok=True, parents=True)

# BW project
project = "Lithium GSA"
bd.projects.set_current(project)
li_name = "Chaerhan_38"

## Option 1 - import databases from scratch

In [None]:
# %%time

# # === Import biosphere ===
# bi.bw2setup()

# # === Import ecoinvent ===
# ei_path = "/Users/akim/Documents/LCA_files/ecoinvent_38_cutoff/datasets"
# ei_name = "ecoinvent 3.8 cutoff"
# if ei_name in bd.databases:
#     print("ecoinvent database already exists")
# else:
#     ei = bi.SingleOutputEcospold2Importer(ei_path, ei_name)
#     ei.apply_strategies()
#     assert ei.all_linked
#     ei.write_database()
    
# # === Import foreground databases ===
# # == Water database ==
# if "Water_38" in bd.databases:
#     print("Water_38 database already exists")
# else:
#     # 1. Specify filepath to your foreground inventories.
#     water_path = "/srv/data/Water_database_38.xlsx"
#     # 2. Create an instance of a class that contains basic methods for importing a database from an excel file.
#     water = bi.ExcelImporter(water_path)  
#     # 3. `apply_strategies` is one of such basic methods, it makes sure units, locations, etc are in correct format.
#     water.apply_strategies()
#     # 4. Next step is to link your foreground exchanges to existing databases by matching relevant exchanges fields.
#     water.match_database("biosphere3", fields=("name", "unit", "categories"))
#     water.match_database("ecoinvent 3.8 cutoff", fields=("name", "location", "unit"))
#     water.metadata.pop(None)  # Remove metadata None entry. TODO
#     # 5. If everything is linked, write database so that it is saved in your project.
#     if water.all_linked:
#         water.write_database()
        
# # == Lithium database ==
# # Do the same steps for the chosen Lithium database
# if li_name in bd.databases:
#     print(f"{li_name} database already exists")
# else:
#     lithium_path = f"/srv/data/Chaerhan_database_38.xlsx"
#     lithium = bi.ExcelImporter(lithium_path)  
#     lithium.apply_strategies()
#     lithium.match_database("biosphere3", fields=("name", "unit", "categories"))
#     lithium.match_database("ecoinvent 3.8 cutoff", fields=("name", "location", "unit")) 
#     # We also need to link Lithium database to the Water database
#     lithium.match_database("Water_38", fields=("name", "location", "unit")) 
#     lithium.metadata.pop(None)
#     if lithium.all_linked:
#         lithium.write_database()

# # === Project backup ===
# # To backup a project use this command. Note that it will save project in your home directory!
# bi.backup_project_directory(project)

## Option 2 - restore existing project

In [None]:
bi.restore_project_directory(f"/srv/data/brightway2-project-{project}-backup.25-October-2022-10-23PM.tar.gz")
bd.projects.set_current(project)

In [None]:
bd.databases

# 2. Deterministic LCIA score

In [None]:
# 1. Let's choose product for LCA
lithium = bd.Database(li_name)
demand_act = [act for act in lithium if "Rotary dryer" in act['name']][0]
functional_unit = {demand_act: 1}  # functional unit

# 2. And impact assessment method
method = ('IPCC 2013', 'climate change', 'GWP 100a')

# 3. Create `lca` object that contains necessary methods for LCI and LCIA 
lca = bc.LCA(functional_unit, method)

# 4. Solve life cycle inventory problem
lca.lci()

# 5. Compute LCIA score
lca.lcia()

# 6. Returns the score, i.e. the sum of the characterized inventory
deterministic_score = lca.score

deterministic_score, bd.Method(method).metadata['unit']

# 3. Uncertainty analysis

Propagate uncertainties in exchanges to LCIA scores. We use Brightway 25 datapackages for that purpose.

## Consider 3 cases

In [None]:
me = bd.Method(method).datapackage()
bs = bd.Database("biosphere3").datapackage()
ei = bd.Database("ecoinvent 3.8 cutoff").datapackage()
wa = bd.Database("Water_38").datapackage()
li = bd.Database(li_name).datapackage()

# Filtered datapackages that do not contain uncertainties
ei_nounct = ei.exclude({"kind": "distributions"})
wa_nounct = wa.exclude({"kind": "distributions"})
li_nounct = li.exclude({"kind": "distributions"})

# Case 1: uncertainties are present in both background and foreground
dps_fb = [me, bs, ei, wa, li]

# Case 2: uncertainties are present ONLY in FOREground
dps_fg = [me, bs, ei_nounct, wa, li]

# Case 3: uncertainties are present ONLY in BACKground
dps_bg = [me, bs, ei, wa_nounct, li_nounct]

cases = {
    "foreground_background": {"datapackages": dps_fb}, 
    "foreground": {"datapackages": dps_fg}, 
    "background": {"datapackages": dps_bg}
}

## Run Monte Carlo simulations for the three cases

In [None]:
%%time

iterations_mc = 1000
seed_mc = 5555  # specifying random seeds is needed for reproducibility of results

# Run MC simulations for all cases
for case, dict_ in cases.items():
    print(case)
    dps = dict_["datapackages"]
    lca_mc = bc.LCA(
        {demand_act.id: 1}, 
        data_objs=dps,
        use_distributions=True, 
        seed_override=seed_mc,
    )
    lca_mc.lci()
    lca_mc.lcia()

    # Since MC takes time, make sure to save the results!
    # Also, read lca_scores from file if you already computed them once
    fp_mc = output_dir / f'lca_scores_{seed_mc}_{iterations_mc}_{case}.json'

    if fp_mc.exists():
        # Read LCIA scores
        with open(fp_mc, 'r') as f:
            lca_scores_mc = json.load(f)
    else:
        # Run Monte Carlo simulations
        lca_scores_mc = []
        for i in range(iterations_mc):
            next(lca_mc)
            lca_scores_mc.append(lca_mc.score)
        # Save LCIA scores
        with open(fp_mc, 'w') as f:
            json.dump(lca_scores_mc, f)
     
    dict_['lca_scores_mc'] = lca_scores_mc
    

## Plot distribution of LCIA scores

TL;DR:  Use plotly instead of matplotlib!

Long version:  These are not most beautiful plots, I highly recommend using [plotly package](https://plotly.com/python/) for visualizations. I believe, it is more intuitive and logical than matplotlib, but more importantly it allows interactive plots, where one can zoom in and out, hide some parts of plots, see what are the exact values of datapoints, etc. In this exercise I couldn't use plotly, because we are running notebooks on the servers.

In [None]:
num_bins = 100
lca_scores_mc_all = np.hstack([dict_['lca_scores_mc'] for dict_ in cases.values()])
lca_scores_mc_all = lca_scores_mc_all[
    np.logical_and(
        lca_scores_mc_all > np.percentile(lca_scores_mc_all, 0),
        lca_scores_mc_all < np.percentile(lca_scores_mc_all, 99),
    )
]
bins = np.linspace(min(lca_scores_mc_all), max(lca_scores_mc_all), num_bins, endpoint=True)
midbins = (bins[:-1] + bins[1:]) / 2
width = (bins[1]-bins[0])*0.8

plt.figure(figsize=(10, 6), dpi=80)
plt.plot(
    [deterministic_score],
    [0],
    "rx",
    label="Deterministic LCIA score",
)

# Uncertainty distributions for all cases
for case, dict_ in cases.items():
    lca_scores_mc = dict_['lca_scores_mc']
    freq, _ = np.histogram(lca_scores_mc, bins=bins, density=False)
    plt.bar(midbins, freq, width=width, label=case, alpha=0.6)

plt.xlabel("LCIA scores, kg CO2-eq")
plt.ylabel("Count")
plt.legend(loc="upper right")

# 4. Global sensitivity analysis for the foreground system

There are a lot more uncertainties in the foreground, so it's alright to do GSA for the foreground only.

GSA means that we need to compute quantitative measures of input importances for all model inputs. These measures are known as sensitivity indices.

<img src="input_data/gsa_indices.png" width=400 style="float:left"/>

## Create `X` matrix

In [None]:
from matrix_utils.resource_group import FakeRNG

def create_X(lca_obj, nsamples, matrix_type, name, seed):
    dp = bwp.create_datapackage(
        name=name,
        seed=seed,
        sequential=True,
    )
    
    num_resources = 3
    if matrix_type == "technosphere":
        num_resources = 4

    obj = getattr(lca_obj, f"{matrix_type}_mm")

    indices_array = np.hstack([
        group.package.data[0] for group in obj.groups
        if (not isinstance(group.rng, FakeRNG)) and (not group.empty) and (len(group.package.data) == num_resources)
    ])

    mask = np.ones(len(indices_array), dtype=bool)

    data = []
    np.random.seed(seed)
    for _ in range(nsamples):
        next(obj)
        idata = []
        for group in obj.groups:
            if (not isinstance(group.rng, FakeRNG)) and (not group.empty):
                idata.append(group.rng.random_data)
        data.append(np.hstack(idata)[mask])
    data_array = np.vstack(data).T
    
    if matrix_type == "technosphere":
        flip_array = np.hstack([
            group.flip for group in obj.groups
            if (not isinstance(group.rng, FakeRNG)) and (not group.empty) and (len(group.package.data) == num_resources)
        ])
        dp.add_persistent_array(
            matrix=f"{matrix_type}_matrix",
            data_array=data_array,
            name=name,
            indices_array=indices_array[mask],
            flip_array=flip_array[mask],
        )
    else:
        dp.add_persistent_array(
            matrix=f"{matrix_type}_matrix",
            data_array=data_array,
            name=name,
            indices_array=indices_array[mask],
        )
    return dp

In [None]:
# Create X matrix
dps_foreground = cases["foreground"]['datapackages']  # only foreground should vary
iterations_gsa = 1000
seed_gsa = 7777

lca_gsa = bc.LCA(
    {demand_act.id: 1}, 
    data_objs=dps_foreground,
    use_distributions=True,
    seed_override=seed_gsa,
)
lca_gsa.lci()
lca_gsa.lcia()

dp_name_gsa = "water_chaerhan"
dp_gsa_tech = create_X(lca_gsa, iterations_gsa, "technosphere", dp_name_gsa, seed_gsa)
dp_gsa_bio = create_X(lca_gsa, iterations_gsa, "biosphere", dp_name_gsa, seed_gsa)

## Run MC simulations, in other words - compute `y`

In [None]:
%%time

gsa_path = output_dir / f'lca_scores_{seed_gsa}_{iterations_gsa}_gsa.json'

if gsa_path.exists():
    # Read LCIA scores
    with open(gsa_path, 'r') as f:
        lca_scores_gsa = json.load(f)
else:
    # Run Monte Carlo simulations
    dps_gsa = [me, bs, ei_nounct, wa_nounct, li_nounct, dp_gsa_tech, dp_gsa_bio]
    lca_gsa = bc.LCA(
        {demand_act.id: 1}, 
        data_objs=dps_gsa,
        use_distributions=False, 
        use_arrays=True,
    )

    lca_gsa.lci()
    lca_gsa.lcia()
    lca_scores_gsa = [lca_gsa.score]
    for i in range(iterations_gsa-1):
        next(lca_gsa)
        lca_scores_gsa.append(lca_gsa.score)
    # Save LCIA scores
    with open(gsa_path, 'w') as f:
        json.dump(lca_scores_gsa, f)

## Compute GSA indices

In [None]:
from scipy.stats import spearmanr
import pandas as pd

# Let's collect X and y values
X = np.vstack([dp_gsa_tech.data[1], dp_gsa_bio.data[1]]).T
y = np.array(lca_scores_gsa)

### First, check linearity of the model to decide which GSA method to choose

Use standardized regression coefficients (SRC) to understand which GSA method is suitable.

If sum of SRC is close to 1, then the model is approximately linear.

In [None]:
from sklearn.linear_model import LinearRegression

train_test_ratio = 0.8
split = int(train_test_ratio*iterations_gsa)
Xtrain = X[:split, :]
ytrain = y[:split]
Xtest = X[split:, :]
ytest = y[split:]

reg = LinearRegression().fit(Xtrain, ytrain)
# High reg.score in linear regression means that the model performs well on the train and test data.
print(reg.score(Xtrain, ytrain), reg.score(Xtest, ytest))

stdX = np.std(X, axis=0)
stdy = np.std(y)

src_coef = reg.coef_ * stdX / stdy
src_sum = sum((src_coef)**2)
src_sum

### Since this LCA model is somewhat linear, compute Spearman correlations

These are GSA indices appropriate for the linear case.

In [None]:
spearman = []
for x in X.T:
    if len(set(x)) != 1:
        spearman.append(spearmanr(x, y)[0])
    else:
        spearman.append(0)
spearman = np.array(spearman)

## Save GSA results in a pandas dataframe

In [None]:
inds = np.hstack([dp_gsa_tech.data[0], dp_gsa_bio.data[0]])
data = np.vstack([dp_gsa_tech.data[1], dp_gsa_bio.data[1]]).T

# Sort inputs (exchanges) by importance. Higher spearman correlation values mean higher importance of an input.
spearman_argsort = np.argsort(spearman)[-1::-1]
inds_ranked = inds[spearman_argsort]

row_act_names, row_act_locations, row_act_categories = [], [], [] 
col_act_names, col_act_locations, static_data = [], [], []
for i in inds:
    row_act = bd.get_activity(i['row'])
    col_act = bd.get_activity(i['col'])
    row_act_names.append(row_act['name'])
    row_act_locations.append(row_act.get("location", "None"))
    row_act_categories.append(row_act.get("category", "None"))
    col_act_names.append(col_act['name'])
    col_act_locations.append(col_act.get("location", "None"))
    for exc in col_act.exchanges():
        if row_act.id == exc.input.id:
            static_data.append(exc.amount)
            break
static_data = np.array(static_data)

dict_ = {
    "input_names": row_act_names,
    "input_locations": row_act_locations,
    "input_categories": row_act_categories,
    "output_names": col_act_names,
    "output_locations": col_act_locations,
    "spearman": spearman,
}

df = pd.DataFrame.from_dict(dict_)
df.sort_values(by="spearman", axis=0, ascending=False, inplace=True)
df.reset_index(inplace=True)
df.to_excel(output_dir / "GSA_results.xlsx")
df

# 5. Plot distributions of important inputs

Could be helpful for interpreting GSA results.

In [None]:
from scipy.stats import lognorm, triang
import plotly.graph_objects as go

def plot_distribution(exchange, samples, num_bins=60):
    bin_min = np.percentile(samples, 1)   # Remove outliers
    bin_max = np.percentile(samples, 99)  # Remove outliers
    bins = np.linspace(bin_min, bin_max, num_bins+1, endpoint=True)
    width = (bins[1]-bins[0])*0.8
    midbins = (bins[1:]+bins[:-1])/2
    
    plt.figure(figsize=(10, 6), dpi=80)

    # Plot distribution samples
    Y_samples, _ = np.histogram(samples, bins=bins, density=True)
    plt.bar(
        midbins,
        Y_samples,
        width=width,
        label="Samples",
        alpha=0.7,
    )
    # Plot defined pdf of the distribution
    if exchange.uncertainty_type.id == 2:
        loc, scale = exchange['loc'], exchange['scale']
        Y_distr = lognorm.pdf(midbins, s=scale, scale=np.exp(loc))
    elif exchange.uncertainty_type.id == 5:
        mode, min_, max_ = exchange['loc'], exchange['minimum'], exchange['maximum']
        c = (mode-min_)/(max_-min_)
        loc = min_
        scale = max_-min_
        Y_distr = triang.pdf(midbins, c=c, loc=loc, scale=scale)
    plt.plot(
        midbins,
        Y_distr,
        "r-",
        label="Defined distribution",
    )
    plt.title(exchange['name'])
    plt.xlabel(f"Exchange amount, {exchange.unit}")
    plt.ylabel("Frequency")
    plt.legend(loc="upper right")
    plt.show()

In [None]:
i = 4  # You can select here an input you are interested in

data_ranked = data[:,spearman_argsort]

col_id = inds_ranked[i]['col']
row_id = inds_ranked[i]['row']
exchanges = list(bd.get_activity(col_id).exchanges())
for exc in exchanges:
    if exc.input.id == row_id:
        break  
samp = data_ranked[:,i]
plot_distribution(exc, samp)

# 6. Validation of GSA results

How do we know that GSA results are correct? Unfortunately, this step is often ignored.

<img src="input_data/validation.png" width=500 style="float:left"/>

In [None]:
from copy import deepcopy

iterations_val = 200

# === All foreground inputs vary ===

# Here we will reuse LCA scores obtained for GSA
all_path = output_dir / f"lca_scores_{seed_gsa}_{iterations_gsa}_gsa.json"        
with open(all_path, 'r') as f:
    lca_scores_all = json.load(f)
lca_scores_all = np.array(lca_scores_all)[:iterations_val]

        
# === Only influential foreground inputs vary ===

ninfs = np.arange(1, 31, 3)
metric = []

for ninf in ninfs:
    
    print(ninf)

    # Create validation datapackage when only influential inputs vary
    mask_inf = spearman <= spearman[spearman_argsort[ninf]]
    data_inf = deepcopy(data)
    data_inf[:,mask_inf] = static_data[mask_inf]

    ntech = len(dp_gsa_tech.data[0])
    name = "validation"
    
    dp_val_inf = bwp.create_datapackage(
        name=name,
        sequential=True,
    )
    dp_val_inf.add_persistent_array(
        matrix=f"technosphere_matrix",
        data_array=data_inf[:, :ntech].T,
        name=f"{name}_tech",
        indices_array=dp_gsa_tech.data[0],
        flip_array=dp_gsa_tech.data[2],
    )
    dp_val_inf.add_persistent_array(
        matrix=f"biosphere_matrix",
        data_array=data_inf[:, ntech:].T,
        name=f"{name}_bio",
        indices_array=dp_gsa_bio.data[0],
    )
    dps_inf = [me, bs, ei_nounct, wa_nounct, li_nounct, dp_val_inf]

    # Run MC
    lca_inf = bc.LCA(
        {demand_act.id: 1}, 
        data_objs=dps_inf,
        use_distributions=False, 
        use_arrays=True,
    )
    lca_inf.lci()
    lca_inf.lcia()
    
    inf_path = output_dir / f"lca_scores_{seed_gsa}_{iterations_val}_validation_inf_{ninf}.json"
    if inf_path.exists():
        # Read LCIA scores
        with open(inf_path, 'r') as f:
            lca_scores_inf = json.load(f)
    else:
        lca_scores_inf = [lca_inf.score]
        for i in range(iterations_val-1):
            next(lca_inf)
            lca_scores_inf.append(lca_inf.score)
        # Save LCIA scores
        with open(inf_path, 'w') as f:
            json.dump(lca_scores_inf, f)
            
    metric.append(spearmanr(lca_scores_inf, lca_scores_all)[0])

In [None]:
plt.figure(figsize=(10, 6), dpi=80)
plt.plot(
    ninfs,
    metric,
    ".-",
    label="Spearman",
)

plt.title("GSA validation curve")
plt.xlabel("# influential inputs")
plt.ylabel("Spearman correlation R(Yall, Yinf)")

## Validation plot for a specific number of influential inputs

In [None]:
ninf = 16

iterations_plot = 50

# Read LCIA scores
inf_path = output_dir / f"lca_scores_{seed_gsa}_{iterations_val}_validation_inf_{ninf}.json"
with open(inf_path, 'r') as f:
    lca_scores_inf = json.load(f)
    
plt.figure(figsize=(10, 6), dpi=80)

plt.plot(
    np.arange(iterations_plot),
    lca_scores_all[:iterations_plot],
    ".-",
    label="All inputs vary",
)

plt.plot(
    np.arange(iterations_plot),
    lca_scores_inf[:iterations_plot],
    "x-",
    label="Only influential inputs vary",
)

plt.title("GSA validation")
plt.xlabel("Iteration")
plt.ylabel("LCIA scores, kg CO2-eq")
plt.legend(loc="upper right")