In [21]:
'''
This script is used for Sensiticity Analysis using OpenLCA.
Codes are developed by Ao Chen @Zweistein.
Connect to IPC server in OpenLCA software before running. (Port: 8080)
TODO: if you're running this script in your machine for the first time, run `product_system_reader.ipynb` first!
Search for TODO in the notebook to see how to run this notebook as needed. 
'''

from notebook.services.config import ConfigManager
cm = ConfigManager()
cm.update('notebook', {'highlight_selected_word': {
    'highlight_across_all_cells': True,
    'only_cells_in_scroll': False,
    'delay': 500,
    'code_cells_only': True,
}})

import os
import csv
import copy
import json
import olca
import uuid
import math
import logging
import traceback
import numpy as np
import pandas as pd
from fuzzywuzzy import fuzz
from datetime import datetime, date

from matplotlib import pyplot as plt
import matplotlib.mlab as mlab
from matplotlib import rcParams
import matplotlib.patches as mpatches
import seaborn as sns

params = {'mathtext.default': 'regular' }

client = olca.Client(8080)
client

<olca.ipc.Client at 0x7fea1b68faf0>

In [22]:
def get_all_process(client):
    # Get all process dataset in dataframe
    Process_descriptor = client.get_descriptors(olca.Process)
    Process_list = []
    ID_list = []
    Location_list = []

    for process in Process_descriptor:
        Process_list.append(process.name)
        ID_list.append(process.id)
        Location_list.append(process.location)
    Processes_df = pd.DataFrame(list(zip(Process_list,
                                    ID_list, Location_list)),
                                columns=['name', 'id', 'location'])
    return Processes_df

In [23]:
Processes_df = get_all_process(client)

In [24]:
Processes_df.head()

Unnamed: 0,name,id,location
0,"heat production, heavy fuel oil, at industrial...",4f582f90-d0ae-3552-be43-70c730e82af0,RoW
1,"containerboard production, fluting medium, rec...",fdffa34a-17fa-3676-9941-0a8a2bc5f5ac,RoW
2,"market for wood chips, dry, measured as dry ma...",35ca5293-f067-326c-ae45-920013c734c7,RER
3,"treatment of waste glass, unsanitary landfill,...",ab082ee7-7469-3a1f-8e8f-f71328b9e842,GLO
4,"heat and power co-generation, natural gas, con...",3c7c0ba3-acd4-3e19-be88-8fff9dc87788,SE


In [25]:
def SA(Result_path, process_id="", start_point=0.5, end_point=1.5, step_size=0.05, Impact_method = 'ei -  IPCC 2013 no LT'):
    if not os.path.exists(Result_path):
        os.makedirs(Result_path)
    SA_dict = {}
    field_cache = ['InputFlow', 'GWP100a', "Relative change"]
    if len(process_id) != 0:
        try:
            search_process_list = [Processes_df[Processes_df['id']==process_id].iloc[0]]
        except Exception as e:
            sys.exit("Invalid process id!")

    else:
        sys.exit("No process id is provided!")

    setup = olca.CalculationSetup()
    setup.calculation_type = olca.CalculationType.SIMPLE_CALCULATION # TODO: Can choose SIMPLE_CALCULATION, CONTRIBUTION_ANALYSIS, UPSTREAM_ANALYSIS, REGIONALIZED_CALCULATION, MONTE_CARLO_SIMULATION
    setup.impact_method = client.find(olca.ImpactMethod, Impact_method)
    setup.amount = 1.0

    try:
        SA_dict = json.load(open("SA_record.json", "r"))
    except FileNotFoundError as e:
        json.dump(SA_dict, open("SA_record.json", "w"))

    for i in range(len(search_process_list)):
        result_list_cache = []
        search_process_ID = search_process_list[i]['id']
        search_process_name = search_process_list[i]['name']
        # cached_path = search_process_name.split(' | ')[0]+'_cached_test.csv'
        
        search_process_location = search_process_list[i]['location']
        cached_path = os.path.abspath(Result_path+"/"+search_process_name.split(' | ')[0]+"_"+search_process_location+'_cached.csv')

        process_key =  search_process_name + '|' + search_process_location
        if process_key not in SA_dict.keys():
            SA_dict[process_key] = {}
        
        test_process = client.get(olca.Process, uid=search_process_ID)
        test_input = [exchange for exchange in test_process.exchanges if exchange.input == True]
        for j in range(len(test_input)):
        # for j in range(2): # for test
            initial_value = copy.deepcopy(test_input[j].amount)
            test_amount_range = test_input[j].amount*np.arange(start_point, end_point+step_size, step_size)
            if test_input[j].default_provider is not None:
                SArange_key = test_input[j].default_provider.name+' | '+test_input[j].default_provider.location+' | '+str(start_point)+' | '+str(end_point)+' | '+str(step_size)
                print('input flow: '+test_input[j].default_provider.name+test_input[j].default_provider.location)

            else:
                test_input[j].default_provider = 'No Default Provider'
                print('input flow: '+test_input[j].default_provider)

            if SArange_key not in SA_dict[process_key].keys():
                SA_dict[process_key][SArange_key] = {}
            
            print('initial amount:' + str(initial_value))
            for k in range(len(test_amount_range)):
            # for k in range(2): # for test
                test_input[j].amount = test_amount_range[k]
                print('new amount: '+str(test_amount_range[k]))
                client.update(test_process)
                # if len(SA_dict[process_key][SArange_key]) != len(test_amount_range):
                if test_amount_range[k] not in SA_dict[process_key][SArange_key].keys():
                    try:
                        print('Creating new product system...')
                        product_system = client.create_product_system(search_process_ID, default_providers='prefer', preferred_type='UNIT_PROCESS') #TODO: preferred_type can be changed to "SYSTEM_PROCESS" if needed
                        SA_dict[process_key][SArange_key][test_amount_range[k]] = product_system.id
                        product_system_id = product_system.id
                    except Exception as e:
                        print('Failed to create product system of '+search_process_ID)
                        logging.error(traceback.format_exc())
                        SA_dict[process_key][SArange_key] = {}
                        try:
                            print('Re-Creating new product system...')
                            product_system = client.create_product_system(search_process_ID, default_providers='prefer', preferred_type='UNIT_PROCESS') #TODO: preferred_type can be changed to "SYSTEM_PROCESS" if needed
                            SA_dict[process_key][SArange_key][test_amount_range[k]] = product_system.id
                            product_system_id = product_system.id
                        except Exception as e:
                            print('Failed to create product system of '+search_process_ID)
                            logging.error(traceback.format_exc())
                            
                    SA_file = open("SA_record.json", "w")
                    json.dump(SA_dict, SA_file)
                    SA_file.close()
                    
                else:
                    print('Product systems have been created for this process and this range amount.')
                    product_system_id = SA_dict[process_key][SArange_key][test_amount_range[k]]
                
                try:
                    print('Calculating...')
                    setup.product_system = client.get(olca.ProductSystem, uid=product_system_id)
                    result = client.calculate(setup)
                    # client.excel_export(calc_result, 'calc_result/'+product_system_locations[j]+'_'+'calc_result.xlsx')
                    if result.impact_results is not None:
                        for m in range(len(result.impact_results)):
                            GWP_result = result.impact_results[m]
                            if 'GWP 100a' in GWP_result.impact_category.name:
                                GWP100a_value = GWP_result.value
                        if test_input[j].default_provider is not None:
                            result_list_cache.append([test_input[j].default_provider.name+test_input[j].default_provider.location, GWP100a_value, test_amount_range[k]/initial_value])
                        else:
                            result_list_cache.append([test_input[j].default_provider, GWP100a_value, test_amount_range[k]/initial_value])
                        with open(cached_path, 'w', newline="") as f:
                            write = csv.writer(f, delimiter=";")
                            write.writerow(["Process name: ", search_process_name+" | "+search_process_location])
                            write.writerow(field_cache)
                            write.writerows(result_list_cache)
                            print('Results cached.')
                    else:
                        print('No impact results: '+process_key)
                except Exception as e:
                    print('Failed to calculate product system: '+product_system_id)
                    logging.error(traceback.format_exc())
                    
                test_input[j].amount = initial_value
                client.update(test_process)
                print('input amount recovered to initial value.')
                
                print('Remaining number of SA variations: '+str(len(test_amount_range)-k-1))
            
            print('Remaining number of input flows to be varied: '+str(len(test_input)-j-1))

In [26]:
def check_impact_method(Impact_method):
    ImpactMethods = client.get_descriptors(olca.ImpactMethod)
    Impact_method_list = []
    for ImpactMethod in ImpactMethods:
        Impact_method_list.append(ImpactMethod.name)
    if Impact_method not in Impact_method_list:
        print('Invalid impact method provided!')
        print('You may want to use:')
        for item in Impact_method_list:
            if fuzz.partial_ratio(Impact_method, item)>50:
                print(item)
        return False
    else:
        return True

In [33]:
process_id = "0416620f-b83a-348c-906c-1b508eee0d37" #TODO: change process id as needed. 
start_point=0.5 # TODO: change start point of SA as needed
end_point=1.5 # TODO: change end point of SA as needed
step_size=0.05 # TODO: change step size of SA as needed
Impact_method = 'ei -  IPCC 2013' # TODO: change impact method as needed.
# TODO: Check out the impact method's name in OpenLCA to ensure it's right.
Today = date.today().strftime("%d_%b_%Y") # day, month, year
Result_path = Today+'_SA' # TODO: change result path as you need
Is_exist_impact_method = check_impact_method(Impact_method)
if Is_exist_impact_method:
    SA(Result_path=Result_path, process_id=process_id, start_point=start_point, end_point=end_point, step_size=step_size, Impact_method=Impact_method)

Invalid impact method provided!
You may want to use:
IPCC 2013 no LT
IPCC 2013
