In [18]:
%pip install pymrio
import pymrio
import re
import string
import pandas as pd
import numpy as np
from IPython.display import display
import matplotlib.pyplot as plt
%pip install seaborn
import seaborn as sns
import os

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [19]:
#Data cleaning process for coicop codes (hbs_conversion_weight_table)
def clean_up(text):
    cleaned = text.translate(str.maketrans('', '', string.punctuation+string.digits+'\n')).lower().strip()
    return cleaned

def clean_coicop_label(label):
    return re.sub(r'\(.*?\)', '', label).strip()

# Function to process COICOP codes (make them HBS-COMPATIBLE)
def process_code(code):
    if pd.isna(code):
        return None
    code = code.lstrip('c')
    code = code.replace('.', '')
    return f'EUR_HE{code}'

In [20]:
def get_region_hbs_fts(X_i, weight_map):
  """Given a region X_i (a column from X) it computes the corresponding region footprints for hbs EUR_HE only  

    X the whole matrix 
    returns a table in the form: category| footprint|
                                 EUR_HExx| value
  """
  data = pd.DataFrame(columns=['category','footprint'])
  #Determine HBS Categories env impacts starting from EXIOBASE env impacts  (following weight_map)
  regions = X_i.index.levels[0]
  categories = weight_map.index

  for hbscat in categories:
    cat_footprint=0
    resp_cat = weight_map.loc[hbscat]
    resp_cat = resp_cat[resp_cat!=0].index.tolist()
    for exiocat in resp_cat:
      rows_sum=X_i.loc[X_i.index.get_level_values('sector') == exiocat].sum()
      cat_footprint += (weight_map.at[hbscat, exiocat]*rows_sum)

    new_row = pd.DataFrame([[hbscat, cat_footprint]], columns=['category', 'footprint'])
    data = pd.concat([data, new_row], ignore_index=True)
  

  return data


In [21]:
#HBS Env factors
def get_hbs_env(country_hbs,country_env):
  """
  Returns the hbs file where only category expenditures are replaced with corresponding footprint values

  country_hbs:
              is the hbs_hh file for the given country (and given year) e.g. IT_HBS_2010
  country_env:
              is a footprint hashmap: where keys are strings of  hbs EUR categories, and values contain unitary footprint associated to the category. (these footprints are specific to the given country and year)
              e.g. 'EUR_HE011' = 2
                   'EUR_HJ01' =  3
                   for IT 2010

  NB:The country_env MUST contain all hbs expenditure categories for which env factors are computed, variables not affected should be omitted (please refer to HBS UserManual)
     categories different from EUR_HE or EUR_HJ should always be omitted.
     if the footprint estimate for any expenditure category is not available then its value in country_env must be at 0
  """

  #create extended footprints vector (country specific) with same format as hbs file headers
  ex_ft = country_hbs.iloc[0:1].copy() # Create a copy of the original DataFrame
  ex_ft.loc[:] = 1


  #for each hbs EUR cat in country_env set the value in ex_ft
  for EUR_cat in country_env.keys():
    if EUR_cat in ex_ft.columns: #safety check, avoids adding non-existing cols
      ex_ft[EUR_cat] = country_env[EUR_cat]
      country_hbs[EUR_cat]=country_hbs[EUR_cat].fillna(0) #avoids NaN values in later operations


  #perform element wise broadcasted multiplication
  headers = country_hbs.columns
  A = np.array(country_hbs)
  x = np.array(ex_ft)

  for EUR_cat in country_env.keys():
      headers = [header.lower() if header == EUR_cat else header for header in headers]

  # Reshape x to make it compatible for broadcasting along the columns of A
  # Perform element-wise multiplication using broadcasting
  B = A * x  # Each column of A is multiplied by the corresponding element in x
  #re-attach hbs headers
  B_df = pd.DataFrame(B, columns=headers)
  return B_df

In [22]:
class Aggregator():

    def __init__(self, level_map):
        """
        A simple aggregator
        takes an hash map, where key is the level and value is a set of labels categories for the level
        where each element is a set of labels categories for the specific level
        lev_labels_list first element is a set of categories for the first level, ..., lev_labels_list[i] must contain set categories for level[i]


        the order is dictated by the fact that categories at level [i+1] are subcategories of categories at level[i]
        """

        self.number_levels = len(level_map)
        self.level_map = level_map


    def get_aggregates(self, df):
        """Pass a dataframe that contains as column headers the set given by the union of label sets present in each level of the level_map
          the values of such df are the ones that needs to be aggregated column-wise

        """
        aggregates = df.copy()
        #from the bottom level to upper level (0) (it skips the lowest level as no aggregation is needed)
        for level in range(len(self.level_map)-1, 0,-1):
            current_lev_cats = self.level_map[level]
            for cat in current_lev_cats:
                subcat_to_sum = self.level_map[level+1]
                pattern = rf"^{re.escape(cat)}"
                subcat_to_sum = set([eur_cat for eur_cat in subcat_to_sum if re.match(pattern, eur_cat)])
                #aggregate level-wise (bottom-up) for EUR categories
                #recall now modified cat are in lower_case

                Sum = pd.DataFrame({
                    'sum': [0] * len(df)
                })

                for subcat in subcat_to_sum:
                  Sum['sum'] +=  df[subcat.lower()]

                if (aggregates[cat.lower()] == 0).all():
                    aggregates[cat.lower()] = Sum['sum']
                elif (aggregates[cat.lower()] < Sum['sum']).any():
                    print('Please check correctness of your footprint estimates! for '+cat+' category and '+str(subcat_to_sum))


        return aggregates

In [None]:
#--ARGS--
REGIONS = ["BE","BG","CY","CZ","DE","DK","EE","EL","ES","FI","FR","HR","HU","IE","IT"]
YEARS = ["2010"]
ENV_FACTS = ["GHG emissions (GWP100) | Problem oriented approach: baseline (CML, 2001) | GWP100 (IPCC, 2007)"]
on_Colab = False  #set this to true if working on Colab
working_DIR="carb_em_project"
working_DIR =  os.path.join(os.getcwd(), working_DIR) 

if on_Colab:
    from google.colab import drive
    working_DIR="/content/drive/MyDrive/carb_em_project"

weight_table_PATH = working_DIR
#load the coicop - exiobase conversion weights map (aka weight_table)
weights = pd.read_excel(os.path.join(weight_table_PATH,'COICOP_EU_ini.xlsx') ,sheet_name='bridge_ini', header=1)
weights = weights.iloc[:63, 2:] #exclude false row/cols
weights = weights.rename(columns={'Unnamed: 2': 'COICOP'})
#we match coicop labels from weight_table with hbs codes
hash_labels = pd.read_excel(os.path.join(weight_table_PATH, 'COICOP_EXIOBASE.xlsx') ,sheet_name='COICOP_to_EXIOBASEprod', header=None)
# clean the coicop labels and produce a map that reports the actual HBS code for the given COICOP category
hash_labels[1] = hash_labels[1].apply(process_code)
hash_labels[2] = hash_labels[2].apply(clean_coicop_label)
hash_labels[2] = hash_labels[2].apply(clean_up)
hash_map = hash_labels[[1, 2]].set_index(2).to_dict()[1]
#Manually add missing labels - hbs code (Following HBS DOCUMENTATION)
hash_map['nonalcoholic beverages']='EUR_HE012'
hash_map['alcoholic beverages']='EUR_HE021'
hash_map['clothing']='EUR_HE031'
hash_map['footwear']='EUR_HE032'
hash_map['actual rent']='EUR_HE041'
hash_map['imputed rent']='EUR_HE042'
hash_map['maintenance and repair of the dwelling']='EUR_HE043'
hash_map['water supply and miscellaneous services reacting to the dwelling']='EUR_HE044'
hash_map['household appliances']='EUR_HE053'
hash_map['tools and equipment for house and garden']='EUR_HE055'
hash_map['medical products appliances and equipment']='EUR_HE061'
hash_map['outpatient services'] = 'EUR_HE062'
hash_map['purchase of vehicles']='EUR_HE071'
hash_map['communication'] = 'EUR_HE08'
hash_map['audiovisual photographic and information processing equipment'] = 'EUR_HE091'
hash_map['other major durables for recreation and culture']='EUR_HE092'
hash_map['other recreational items and equipment gardens and pets'] = 'EUR_HE093'
hash_map['recreational and cultural services']='EUR_HE094'
hash_map['newspapers books and stationery'] = 'EUR_HE095'
hash_map['preprimary and primary education secondary education postsecondary education tertiary education and education not defined by level'] = 'EUR_HE10'
hash_map['catering services'] = 'EUR_HE111'
hash_map['personal care']='EUR_HE121'
hash_map['insurance']='EUR_HE125'
hash_map = {key: value.strip() for key, value in hash_map.items()}
display(hash_map)
#replace coicop labels in original weight_table with actual HBS code using the previously created hash map
weights['COICOP'] = weights['COICOP'].apply(clean_up)
weights['COICOP'] = weights['COICOP'].replace(hash_map)
weights.set_index('COICOP', inplace=True)
weights = weights.iloc[:, :-1] #exclude last column (total)
weights.columns = weights.columns.map(clean_up)
display(weights)


for year in YEARS:
    working_DIR= os.path.join(working_DIR, year)
    if not os.path.exists(working_DIR):
        os.makedirs(working_DIR)

    if on_Colab:  #actually this should be run always regardless of colab, but on low-end pc I use pre-computed tables
        exiobase_Par=os.path.join(working_DIR,"IOT_"+year+"_pxp.zip")
        exio_download = pymrio.download_exiobase3(storage_folder=working_DIR, system="pxp", years=year)
        exiobase3 = pymrio.parse_exiobase3(path=exiobase_Par)
        exiobase3.calc_all()
        _regions = exiobase3.Y.columns.get_level_values(0).unique()
        Y_a = pd.DataFrame(index=exiobase3.Y.index)
        for _region in _regions:
            # Select all 7 columns for the region and aggregate them
            region_columns = exiobase3.Y.loc[:, _region]
            Y_a[_region] = region_columns.sum(axis=1)
        

        for env_fact in ENV_FACTS:
            P=exiobase3.impacts.S.loc[env_fact]
            P = P/1000000
            X = exiobase3.L.dot(Y_a)
            X = X.multiply(P, axis=0)  # element-wise multiplication
            X.to_csv(os.path.join(working_DIR,"X_"+env_fact[:3]+".csv"), index=True,  header=True)

    if not os.path.exists(working_DIR+"/out"):
        os.makedirs(working_DIR+"/out")

    for env_fact in ENV_FACTS:
        X = pd.read_csv(os.path.join(working_DIR,"X_"+env_fact[:3]+".csv"),  index_col=[0, 1],)
        _regions = X.columns.unique()
        #clean Exiobase labels for X matrix
        X.index = X.index.set_levels(
                X.index.levels[1].map(clean_up),  
                level=1                   
        )

        #compute for each region in hbs EUR_HE env footprints, and add them to ft table
        footprints =  pd.DataFrame(columns=['region', 'category','footprint'])
        for _region in _regions:
            o=get_region_hbs_fts(X[_region],weights)
            for index, row in o.iterrows():
                new_row = {'region': _region, 'category': row['category'], 'footprint': row['footprint']}
                footprints.loc[len(footprints)] = new_row

        footprints.to_csv(os.path.join(working_DIR,"footprints_"+env_fact[:3]+".csv"), index=False)

        for country_ticker in REGIONS:
            #fixed a year, this runs region by region
            hbs_hh= pd.read_excel(os.path.join(working_DIR,country_ticker+"_HBS_hh.xlsx"))
            #building the footprint tables by adding only Expenditure categories vars of hbs file
            EUR_HE_cats = [category for category in hbs_hh.columns if re.match(r"^EUR_HE.*", category)]  #HOME expens
            EUR_HJ_cats = [category for category in hbs_hh.columns if re.match(r"^EUR_HJ.*", category)]  #ABROAD expens

            ft = dict(zip(EUR_HE_cats, [0]*len(EUR_HE_cats)))   #from here one can notice that an envfactor of zero for a given expcategory  does not necessarily mean that its associated footprint is actually 0 (can be that we have no data on footprint of the category)
            #ft will later contain the effective  region specific footprints for each category
            #HBS structure presents EUR_HE cat in 4 levels (where first means the top-level categories):
            level_map = {}
            for i in range(5, 1, -1):
                #determine level (i-1) EUR categories using RegEx and HBS structure guidelines
                pattern = rf"EUR_HE\d{{{i}}}\b"
                curr_level_cat = [category for category in (set(EUR_HE_cats)) if re.match(pattern, category)]
                curr_level_cat = set(curr_level_cat)
                level_map[i-1] = curr_level_cat

            aggregator = Aggregator(level_map)
            saving_path = os.path.join(working_DIR,"out")
            
            footprints = pd.read_csv(os.path.join(working_DIR,"footprints_"+env_fact[:3]+".csv"))
            filtered_fact = footprints[footprints['region'] == country_ticker]

            for index, row in filtered_fact.iterrows():
                if row['category'] in ft.keys(): #avoids adding non-existing EUR_cat to ft
                    ft[row['category']] = row['footprint']

            result = get_hbs_env(hbs_hh,ft)
            #result.to_csv(os.path.join(saving_path,country_ticker+'_'+year+'_'+"ENVHBS_hh.csv"), index=False)  before aggregation

            #aggregate
            result = aggregator.get_aggregates(result)
            
            #eur_he00 follows special aggregation process
            Sum = pd.DataFrame({
                    'sum': [0] * len(result)
                })
            level_cats = level_map[1] - set('EUR_HE00')
            for cat in level_cats:
                Sum['sum'] +=  result[cat.lower()]

                if (result['eur_he00'] == 0).all():
                    result['eur_he00'] = Sum['sum']
                elif (result['eur_he00'] < Sum['sum']).any():
                    print('Please check correctness of your footprint estimates! for EUR_HJ00 category and '+str(level_cats))
            
                #compute now eur_hjxx env fact
            ft = dict(zip(EUR_HJ_cats, [0]*len(EUR_HJ_cats))) 
            result.to_csv(os.path.join(saving_path,country_ticker+'_'+year+'_'+env_fact[:3]+"_ENVHBS_hh.csv"), index=False)  
            #divide each eur_he00 (first level, to which EUR_HJ are referred to) by EUR_HE00 (original hbs)
            #use such values as env_footprint for eur_hj for current region

            # Sum = pd.DataFrame({
            #                 'sum': [0] * len(result)
            #                     })
            # level_cats = set(EUR_HJ_cats) - set(['EUR_HJ00', 'EUR_HJ90'])
            # for cat in level_cats:
            #     Sum['sum'] +=  result[cat.lower()]

            #     if (result['eur_hj00'] == 0).all():
            #         result['eur_hj00'] = Sum['sum']
            #     elif (result['eur_hj00'] < Sum['sum']).any():
            #         print('Please check correctness of your footprint estimates! for EUR_HJ00 category and '+str(level_cats))

                        #produce_env_hbs(region,env_fact)  #args will be named:  country_ticker, year, env_fact
                        #place again code agnostic blocks 

### After Exiobase computation, use weight table to compute hbs env factors

###Load HBS file structure and Compute Environmental HBS

In [None]:
#NOT IN USE
#define hash_map for manual matching indirect home (HJ_xx) exploiting HE_xx
i_hash_map = {}

#Manually add missing labels - hbs code (Following HBS DOCUMENTATION)
i_hash_map['EUR_HJ00'] = ''
i_hash_map['EUR_HJ01']='EUR_HE01' #Food and non-alcoholic beverages
i_hash_map['EUR_HJ02']='EUR_HE02' #Alcoholic beverages, tobacco and narcotics
i_hash_map['EUR_HJ03']='' #Clothing and footwear
i_hash_map['EUR_HJ04']='' #Housing, water, electricity, gas and other fuels
i_hash_map['EUR_HJ05']='' #Furnishings, household equipment and routine household maintenance
i_hash_map['EUR_HJ06']='' #Health
i_hash_map['EUR_HJ07']='' #Transport
i_hash_map['EUR_HJ08']='' #Communication
i_hash_map['EUR_HJ09']='' #Recreation and culture
i_hash_map['EUR_HJ10']='' #Education
i_hash_map['EUR_HJ11']='' #Restaurants and hotels
i_hash_map['EUR_HJ12']='' #Miscellaneous goods and services
#instead of doing it manually
for key in i_hash_map:
    i_hash_map[key] = key.replace('J', 'E')
i_hash_map['EUR_HJ90'] = '' #Consumption expenditure on travelling and holidays done abroad  (no MATCH for EUR_HExx)