## PM and NOx emissions 

This script calculates the life cycle PM and NOx emissions. 

Import Brightway and necessary packages: 

In [58]:
from brightway2 import *

In [59]:
import numpy as np  
import bw2analyzer as bw2a
import pandas as pd 
%run initialize_notebook.ipynb
%matplotlib inline
#plt.style.use('seaborn')

In [60]:
import functools


@functools.lru_cache(512)
def faster_get(key):
    return get_activity(key)


class FlowGetter:
    def __init__(self, lca):
        _, _, self.rb = lca.reverse_dict()

    def direct(self, lca, activity):
        col = lca.activity_dict[activity]
        matrix = lca.characterized_inventory[:, col].tocoo()
        return {
            faster_get(self.rb[row]): {
                'score': lca.characterized_inventory[row, col],
                'amount': lca.inventory[row, col]
            }
            for row in matrix.row
        }

    def cumulative(self, lca, activity):
        col = lca.activity_dict[activity]
        matrix = np.array(lca.characterized_inventory.sum(axis=1)).ravel()
        return {
            faster_get(self.rb[row]): {
                'score': matrix[row] - lca.characterized_inventory[row, col],
                'amount': lca.inventory[row, :].sum() - lca.inventory[row, col]
            }
            for row, value in enumerate(matrix)
            if float(value) != 0
        }



exists = lambda dct: {k: v for k, v in dct.items() if v is not None}


def recursive_calculator(lca, activity, amount, total_score, flow_getter, depth=0, max_depth=7, cutoff=0.01):
    if depth >= max_depth:
        return

    lca.redo_lcia({activity: amount})
    if lca.score < total_score * cutoff:
        return

    return {
        'amount': amount,
        'activity': activity,
        'direct score': float(lca.characterized_inventory[
            :, lca.activity_dict[activity]
        ].sum()),
        'cumulative score': lca.score,
        'flows': {
            'direct': flow_getter.direct(lca, activity),
            'cumulative': flow_getter.cumulative(lca, activity)
        },
        'depth': depth,
        'chain': [recursive_calculator(lca, exc.input, exc['amount'] * amount, total_score, 
                                       flow_getter, depth + 1, max_depth=max_depth, cutoff=cutoff)
                  for exc in activity.technosphere()],
    }


def traverse_supply_chain(activity, amount, method, max_depth, cutoff):
    lca = LCA({activity: amount}, method)
    lca.lci()
    lca.lcia()
    return recursive_calculator(
        lca,
        activity,
        amount,
        lca.score,
        FlowGetter(lca),
        max_depth=max_depth,
        cutoff=cutoff
    )

In [61]:
def max_depth(input_dic):
    for x in input_dic["chain"]:
        if x is not None:
            return max(x["depth"], max_depth(x))
        else: 
            input_dic["depth"]

In [62]:
def get_values_with_depth(key, d):
    result = [(d[key], d['depth'])]
    for ds in d['chain']:
        if ds is not None:
            result.extend(get_values_with_depth(key, ds))
    return result

In [63]:
def gen_dict_extract_1key(key1, d):
    return {
    d[key1]: d['cumulative score'] + d['direct score'],
    'depth': d['depth'],
    'chain': [gen_dict_extract_1key(key1, ds) for ds in d['chain'] if ds is not None]
  }

In [64]:
def gen_dict_extract_1key_amt(key1, d):
    return {
    d[key1]: d['cumulative score'] + d['direct score'],
    'depth': d['depth'],
    'amount': d['amount'],
    'chain': [gen_dict_extract_1key_amt(key1, ds) for ds in d['chain'] if ds is not None]
  }

In [65]:
def gen_dict_extract_2key(key1, key2, d):
    return {
    d[key1]: d['cumulative score'] + d['direct score'],
    'depth': d['depth'],
    'chain': [gen_dict_extract_2key(key1, key2, ds) for ds in d['chain'] if ds is not None],
    key2: max(get_values_with_depth(key2, d), key=lambda x: x[1])[0]
  }

In [66]:
def gen_dict_extract_2key_cumulative(key1, key2, d):
    return {
    d[key1]: d['cumulative score'], 
    'depth': d['depth'],
    'chain': [gen_dict_extract_2key(key1, key2, ds) for ds in d['chain'] if ds is not None],
    key2: max(get_values_with_depth(key2, d), key=lambda x: x[1])[0]
  }

In [67]:
def gen_dict_extract_2key_direct(key1, key2, d):
    return {
    d[key1]: d['direct score'], 
    'depth': d['depth'],
    'chain': [gen_dict_extract_2key(key1, key2, ds) for ds in d['chain'] if ds is not None],
    key2: max(get_values_with_depth(key2, d), key=lambda x: x[1])[0]
  }

In [68]:
%run initialize_notebook.ipynb

## Calculating life-cycle air pollutant emissions per km 

In [69]:
def contribution_air_poll(car, method):    

    # Do lca and traversing the supply chain
    lca = LCA({car: 1}, method)
    lca.lci()
    lca.lcia()
    lca.score
    
    if method == ('ReCiPe Midpoint (H)', 'particulate matter formation', 'PMFP'):
        res_chain = traverse_supply_chain(car, 1, method, max_depth=2, cutoff=0.001)
    if method == ('ReCiPe Midpoint (H)', 'photochemical oxidant formation', 'POFP'): 
        res_chain = traverse_supply_chain(car, 1, method, max_depth=2, cutoff=0.003)
    
    # Defining lists
    pm25, pm10, nox, so2, nh3, df_list, nmvoc = ([] for i in range(7))


    # Creates the number of processes in the chain
    processes = (get_values_with_depth("activity", res_chain))
    
    for process in np.arange(0,len(processes)-1):
        #Picking a given foreground process
        a = gen_dict_extract_2key("activity", "flows", res_chain)['chain'][process]['flows']

        mydict = a['cumulative']

        if method ==  ('ReCiPe Midpoint (H)', 'particulate matter formation', 'PMFP'): 
            nox_dict = {k: v for k, v in mydict.items() if 'Nitrogen oxides' in k['name']}
            pm25_dict = {k: v for k, v in mydict.items() if 'Particulates, < 2.5 um' in k['name']}
            pm10_dict = {k: v for k, v in mydict.items() if 'Particulates, > 2.5 um' in k['name']}
            so2_dict = {k: v for k, v in mydict.items() if 'Sulfu' in k['name']}
            nh3_dict = {k: v for k, v in mydict.items() if 'Ammon' in k['name']}
            
            
            
            pm25.append(sum([pm25_dict[n]['amount'] for n in pm25_dict]))
            
            pm10.append(sum([pm10_dict[n]['amount'] for n in pm10_dict]))
            nox.append(sum([nox_dict[n]['amount'] for n in nox_dict]))
            
            so2.append(sum([so2_dict[n]['amount'] for n in so2_dict]))
            
            nh3.append(sum([nh3_dict[n]['amount'] for n in nh3_dict]))
            

        elif method == ('ReCiPe Midpoint (H)', 'photochemical oxidant formation', 'POFP'):
            nox_dict = {k: v for k, v in mydict.items() if 'Nitrogen oxides' in k['name']}
            nox.append(sum([nox_dict[n]['amount'] for n in nox_dict]))
            nmvoc_dict = {k: v for k, v in mydict.items() if 'NMVOC' in k['name']}
            nmvoc.append(sum([nmvoc_dict[n]['amount'] for n in nmvoc_dict]))
        else: 
            print('function only valid for PMFP or POFP')
            
    if method == ('ReCiPe Midpoint (H)', 'particulate matter formation', 'PMFP'): 
        df_list = [pm25, pm10, nox, so2, nh3]
        index_list = ['PM2.5', 'PM10', 'NOx', 'SO2', 'NH3']
        direct_pm25 = {k: v for k, v in res_chain['flows']['direct'].items() if 'Particulates' in k['name']}
        direct_so2 = {k: v for k, v in res_chain['flows']['direct'].items() if 'Sulfu' in k['name']}
        direct_nox = {k: v for k, v in res_chain['flows']['direct'].items() if 'Nitrogen oxides' in k['name']}
        direct_nh3 = {k: v for k, v in res_chain['flows']['direct'].items() if 'Ammon' in k['name']}
        
        pm25.append((sum([direct_pm25[n]['amount'] for n in direct_pm25])))
        nox.append((sum([direct_nox[n]['amount'] for n in direct_nox])))
        so2.append((sum([direct_so2[n]['amount'] for n in direct_so2])))
        nh3.append((sum([direct_nh3[n]['amount'] for n in direct_nh3])))
    else: 
        df_list = [nox, nmvoc]
        index_list = ['NOx', 'NMVOC']
        direct_nox = {k: v for k, v in res_chain['flows']['direct'].items() if 'Nitrogen oxides' in k['name']}
        direct_nmvoc = {k: v for k, v in res_chain['flows']['direct'].items() if 'NMVOC' in k['name']}
        nox.append((sum([direct_nox[n]['amount'] for n in direct_nox])))
        nmvoc.append((sum([direct_nmvoc[n]['amount'] for n in direct_nmvoc])))
        
    #Assuming that all direct pm from combustion in <2,5 
    
    
    
    columns = [processes[i][0]['name'] for i in np.arange(len(processes))][1:len(processes)]
    columns.append('direct')
    return df_list,index_list,columns
    

In [70]:
#Creating dataframe with air pollutant emissions per functional unit (1km) 
def create_df(list_of_cars, method): 
    df_list = list()
    
    for car in list_of_cars: 
        res = contribution_air_poll(car, method)
        df_list.append(pd.DataFrame(res[0], columns = res[2], index = res[1]))
    if "euro" in str(list_of_cars) and "euro 0" not in str(list_of_cars):     
        keys_df = [x['name'].split()[3]+x['name'].split()[4]+x['name'].split()[5]+x['name'].split()[6] for x in list_of_cars]
        
    elif "euro 0" in str(list_of_cars) and "pre" not in str(list_of_cars):
        keys_df = [str(x['name']) for x in list_of_cars]
    elif "pre euro 0" in str(list_of_cars): 
        keys_df = [str(x['name']) for x in list_of_cars]
        
    else: 
        keys_df = [x['name'].split()[4]+x['name'].split()[5] for x in list_of_cars] 
    return pd.concat(df_list, keys = keys_df)
    #return df_list

In [71]:
def agg_df(df,marginal):
    
    #Have to define the correct el mix
    if marginal == "yes": 
        fuel_cols = ['market for electricity, low voltage, marginal']
    if marginal == "no": 
        fuel_cols = ['market for electricity, low voltage, 2018']
        
    #if BEV and PMF
    if "BEV" in str(df.index.tolist()[0]):
        if "PM" in str(df.index.tolist()[0]): 
            prod_cols = ['Battery BoP', 'Battery cell',
           'market for charger, electric passenger car',
           'market for converter, for electric passenger car',
           'market for electric motor, electric passenger car',
           'market for glider, passenger car',
           'market for internal combustion engine, passenger car',
           'market for inverter, for electric passenger car',
           'market for power distribution unit, for electric passenger car']

            direct_cols = ['market for road wear emissions, passenger car',
           'market for tyre wear emissions, passenger car', 
            'market for brake wear emissions, passenger car', 
            'direct']
        if "NOx" in str(df.index.tolist()[0]): 
            prod_cols = ['Battery BoP', 'Battery cell',
           'market for charger, electric passenger car',
           'market for converter, for electric passenger car',
           'market for electric motor, electric passenger car',
           'market for glider, passenger car',
           'market for internal combustion engine, passenger car',
    
           'market for inverter, for electric passenger car',
           'market for power distribution unit, for electric passenger car']

            direct_cols = [
            'direct']

        
        
    if "ICEV" in str(df.index.tolist()[0]):
        if "PM" in str(df.index.tolist()[0]):
            prod_cols = ['market for glider, passenger car',
           'market for internal combustion engine, passenger car',
                     'market for internal combustion engine, passenger car']

            direct_cols = ['market for road wear emissions, passenger car',
           'market for tyre wear emissions, passenger car', 
            'market for brake wear emissions, passenger car', 
            'direct']
            
        if "NOx" in str(df.index.tolist()[0]): 
            prod_cols = ['market for glider, passenger car',
           'market for internal combustion engine, passenger car',
                     'market for internal combustion engine, passenger car']

            direct_cols = ['direct']   
            
        if "ICEV-d" in str(df.index.tolist()[0]):
                fuel_cols = ['market for diesel']
        if "ICEV-p" in str(df.index.tolist()[0]):
                fuel_cols = ['market for petrol, low-sulfur']
            
    return df.assign(prod_disp = df[prod_cols].sum(1), direct = df[direct_cols].sum(1), fuel = df[fuel_cols].sum(1)).drop(prod_cols,1).drop(direct_cols[0:-1],1).drop(fuel_cols,1)
    



## Creating dataframe with LCA results for air pollutants  

For BEV with marginal electricity mix: 

In [72]:
# Aggregated dataframe
current_BEV_marginal_POF = agg_df(create_df([x for x in Database("Current cars marginal mix") if "BEV" in x['name']], ('ReCiPe Midpoint (H)', 'photochemical oxidant formation', 'POFP')), "yes")


current_BEV_marginal_PMF = agg_df(create_df([x for x in Database("Current cars marginal mix") if "BEV" in x['name']], ('ReCiPe Midpoint (H)', 'particulate matter formation', 'PMFP')), "yes")




For BEV with average electricity mix: 

In [73]:
# Aggregated dataframe
current_BEV_average_POF = agg_df(create_df([x for x in Database("Current cars") if "BEV" in x['name']], ('ReCiPe Midpoint (H)', 'photochemical oxidant formation', 'POFP')), "no")


current_BEV_average_PMF = agg_df(create_df([x for x in Database("Current cars") if "BEV" in x['name']], ('ReCiPe Midpoint (H)', 'particulate matter formation', 'PMFP')), "no")




In [74]:
current_BEV_average_POF

Unnamed: 0,Unnamed: 1,"maintenance, passenger car",market for road,direct,prod_disp,fuel
"BEV,Mini",NOx,7e-06,4.6e-05,0,0.000126,0.000104
"BEV,Mini",NMVOC,7e-06,3.6e-05,0,5.1e-05,1e-05
"BEV,Medium",NOx,1.3e-05,7.5e-05,0,0.000243,0.000133
"BEV,Medium",NMVOC,1.2e-05,5.9e-05,0,8.9e-05,1.3e-05
"BEV,Van",NOx,1.3e-05,7.6e-05,0,0.000244,0.000135
"BEV,Van",NMVOC,1.2e-05,6e-05,0,9.1e-05,1.4e-05
"BEV,Large",NOx,1.5e-05,8.4e-05,0,0.000273,0.000181
"BEV,Large",NMVOC,1.4e-05,6.7e-05,0,0.000102,1.8e-05
"BEV,Small",NOx,9e-06,5.5e-05,0,0.000169,0.000106
"BEV,Small",NMVOC,9e-06,4.3e-05,0,6.4e-05,1.1e-05


For conventional vehicles of different euro classes: 

In [75]:
result = list()
method = ('ReCiPe Midpoint (H)', 'particulate matter formation', 'PMFP')
for euro in ['pre euro 0','euro 0','euro 1', 'euro 2', 'euro 3', 'euro 4', 'euro 5', 'euro 6']:
    list_cars = [x for x in Database(euro) if "ICEV-d" in x['name']]
    temp = (create_df(list_cars,method))
    result.append(temp)
diesel_PMF = pd.concat([agg_df(result[i], "no") for i in range(8)], keys = ['pre euro 0','euro 0', 'euro 1', 'euro 2', 'euro 3', 'euro 4', 'euro 5', 'euro 6'])


result = list()

for euro in ['pre euro 0','euro 0','euro 1', 'euro 2', 'euro 3', 'euro 4', 'euro 5', 'euro 6']:
    list_cars = [x for x in Database(euro) if "ICEV-p" in x['name']]
    temp = (create_df(list_cars,method))
    result.append(temp)
petrol_PMF = pd.concat([agg_df(result[i], "no") for i in range(8)], keys = ['pre euro 0','euro 0', 'euro 1', 'euro 2', 'euro 3', 'euro 4', 'euro 5', 'euro 6'])

older_cars_PMF = petrol_PMF.append(diesel_PMF)

In [76]:
result = list()
method = ('ReCiPe Midpoint (H)', 'photochemical oxidant formation', 'POFP')
for euro in ['pre euro 0','euro 0','euro 1', 'euro 2', 'euro 3', 'euro 4', 'euro 5', 'euro 6']:
    list_cars = [x for x in Database(euro) if "ICEV-d" in x['name']]
    temp = (create_df(list_cars,method))
    result.append(temp)
diesel_POF = pd.concat([agg_df(result[i], "no") for i in range(8)], keys = ['pre euro 0','euro 0', 'euro 1', 'euro 2', 'euro 3', 'euro 4', 'euro 5', 'euro 6'])


result = list()

for euro in ['pre euro 0','euro 0','euro 1', 'euro 2', 'euro 3', 'euro 4', 'euro 5', 'euro 6']:
    list_cars = [x for x in Database(euro) if "ICEV-p" in x['name']]
    temp = (create_df(list_cars,method))
    result.append(temp)
petrol_POF = pd.concat([agg_df(result[i], "no") for i in range(8)], keys = ['pre euro 0','euro 0', 'euro 1', 'euro 2', 'euro 3', 'euro 4', 'euro 5', 'euro 6'])

older_cars_POF = petrol_POF.append(diesel_POF)