In [1]:
import ecoinvent_interface as ei
settings = ei.Settings(username="XXX", password="XXX")
release = ei.EcoinventRelease(settings)
release.list_versions()
release.list_system_models('3.7.1')
RELEASE_PATH = release.get_release(
    version='3.7.1',
    system_model='apos',
    release_type=ei.ReleaseType.ecospold
)

In [2]:
release.list_extra_files('3.7.1')
LCIA_PATH = release.get_extra(version='3.7.1', filename='ecoinvent 3.7.1_LCIA_implementation.7z')

In [3]:
from lxml import objectify

NS = "{http://www.EcoInvent.org/EcoSpold02}"

ACTIVITIES_FP = RELEASE_PATH / "MasterData" / "ActivityIndex.xml"
GEOGRAPHIES_FP = RELEASE_PATH / "MasterData" / "Geographies.xml"
ACTIVITY_NAME_FP = RELEASE_PATH / "MasterData" / "ActivityNames.xml"

SPECIAL_ACTIVITY_TYPE_MAP: dict[int, str] = {
    0: "ordinary transforming activity (default)",
    1: "market activity",
    2: "IO activity",
    3: "Residual activity",
    4: "production mix",
    5: "import activity",
    6: "supply mix",
    7: "export activity",
    8: "re-export activity",
    9: "correction activity",
    10: "market group",
}

def maybe_missing(element: objectify.ObjectifiedElement, attribute: str) -> str | None:
    try:
        return getattr(element, attribute).text
    except AttributeError:
        return None

geographies_mapping = {
    elem.get("id"): elem.name.text
    for elem in objectify.parse(open(GEOGRAPHIES_FP)).getroot().iterchildren(NS + "geography")
}
activity_names_mapping = {
    elem.get("id"): elem.name.text
    for elem in objectify.parse(open(ACTIVITY_NAME_FP)).getroot().iterchildren(NS + "activityName")
}

activity_mapping = {
    elem.get("id"): {
        "name": activity_names_mapping[elem.get("activityNameId")],
        "geography": geographies_mapping[elem.get("geographyId")],
        "start": elem.get("startDate"),
        "end": elem.get("endDate"),
        "type": SPECIAL_ACTIVITY_TYPE_MAP[int(elem.get("specialActivityType"))],
    }
    for elem in objectify.parse(open(ACTIVITIES_FP)).getroot().iterchildren(NS + "activityIndexEntry")
}

In [5]:
activity_mapping["f7e93a25-56e4-4268-a603-3bfd57c79eff"]

{'name': '2-butanol production by hydration of butene',
 'geography': 'Global',
 'start': '1981-01-01',
 'end': '2005-12-31',
 'type': 'ordinary transforming activity (default)'}

In [6]:
PRODUCTS_FP = RELEASE_PATH / "MasterData" / "IntermediateExchanges.xml"

def maybe_missing(
    element: objectify.ObjectifiedElement, attribute: str, pi: bool | None = False
):
    try:
        if pi:
            return element.productInformation.find(NS + "text")
        else:
            return getattr(element, attribute).text
    except AttributeError:
        return ""


product_mapping = {
    elem.get("id"): {
        "name": elem.name.text,
        "unit": elem.unitName.text,
        "comment": maybe_missing(elem, "comment"),
        "product_information": maybe_missing(elem, "productInformation", True),
        "classifications": dict(
            [
                (c.classificationSystem.text, c.classificationValue.text)
                for c in elem.iterchildren(NS + "classification")
            ]
        ),
    }
    for elem in objectify.parse(open(PRODUCTS_FP)).getroot().iterchildren()
}

In [7]:
FLOWS_FP = RELEASE_PATH / "MasterData" / "ElementaryExchanges.xml"

ecosphere_flows_mapping = {
    elem.get("id"): {
        "name": elem.name.text,
        "unit": elem.unitName.text,
        "chemical_formula": elem.get("formula") or None,
        "CAS": elem.get("casNumber") or None,
        "compartments": [
            elem.compartment.compartment.text, 
            elem.compartment.subcompartment.text
        ],
        "synonyms": [obj.text for obj in elem.iterchildren(NS + "synonym")]
    }
    for elem in objectify.parse(open(FLOWS_FP)).getroot().iterchildren(NS + "elementaryExchange")
}

In [8]:
product_list, process_list = [], []

for filepath in (RELEASE_PATH / "datasets").iterdir():
    if ".spold" in filepath.name:
        process, product = filepath.name.replace(".spold", "").split("_")
        process_list.append(process)
        product_list.append(product)

from collections import Counter
Counter(process_list).most_common(10)

[('9aac0778-3c9c-4ca6-b3dd-0be8226231e1', 20),
 ('2b856090-9c59-4de8-819c-eaf92a8575aa', 19),
 ('50116c55-67c9-489f-b2f9-ce04f0d62a8b', 17),
 ('3a06bdf5-24c4-43c6-8bfb-b8e4ec829916', 10),
 ('e92d2e87-3ff5-4bc8-9a46-affcc4e0b068', 10),
 ('562af63a-2c99-4896-ad9c-dfddaa86e36d', 9),
 ('f761c9ab-3ffc-479d-8f51-b4e33fd8d6a5', 9),
 ('8980bfa0-a957-4a1f-9a63-8c3a26a04cce', 9),
 ('35aad4e8-0882-4d98-8377-8c9bddd31d3f', 8),
 ('7e7169e9-ad99-443c-942f-b36e28868b45', 7)]

In [9]:
activity_mapping['9aac0778-3c9c-4ca6-b3dd-0be8226231e1']

{'name': 'primary zinc production from concentrate',
 'geography': 'Rest-of-World',
 'start': '2015-01-01',
 'end': '2020-12-31',
 'type': 'ordinary transforming activity (default)'}

In [10]:
zinc_products = []

for filepath in (RELEASE_PATH / "datasets").iterdir():
    if ".spold" in filepath.name:
        process, product = filepath.name.replace(".spold", "").split("_")
        if process == '9aac0778-3c9c-4ca6-b3dd-0be8226231e1':
            zinc_products.append(product_mapping[product]['name'])


sorted(zinc_products)

['ammonium sulfate',
 'cadmium',
 'cadmium sludge from zinc electrolysis',
 'cobalt',
 'copper concentrate, sulfide ore',
 'copper sulfate',
 'copper, cathode',
 'gold',
 'gypsum, mineral',
 'heat, from steam, in chemical industry',
 'indium rich leaching residues, from zinc production',
 'iron scrap, unsorted',
 'lead',
 'lead concentrate',
 'silver',
 'sulfur',
 'sulfur dioxide, liquid',
 'sulfuric acid',
 'zinc',
 'zinc monosulfate']

In [11]:
product_mapping['66c93e71-f32b-4591-901c-55395db5c132']

{'name': 'electricity, high voltage',
 'unit': 'kWh',
 'comment': '',
 'product_information': '',
 'classifications': {'By-product classification': 'allocatable product',
  'CPC': '17100: Electrical energy'}}

In [12]:
Counter(product_list).most_common(10)

[('66c93e71-f32b-4591-901c-55395db5c132', 2172),
 ('d69294d7-8d64-4915-a896-9996a014c410', 674),
 ('759b89bd-3aa6-42ad-b767-5bb9ef5d331d', 410),
 ('71e2f1db-a2c5-44d0-8337-dfff15be974d', 349),
 ('9b9edcf3-0539-4642-9516-0df642a5c41a', 228),
 ('4c2bb0f0-422d-4fb3-bcfe-a36ef1b55874', 197),
 ('d7432632-40dc-4af8-8125-cb70dd9742c5', 169),
 ('45fbbc41-7ae9-46cc-bb31-abfa11e69de0', 159),
 ('637ee275-a239-4dcb-b084-abfa110dd65b', 155),
 ('dd80f0f2-f4d5-40f0-9035-09c1a7f3f07b', 137)]

In [13]:
Counter(product_list)['66c93e71-f32b-4591-901c-55395db5c132']

2172

In [14]:
import hashlib

_ = lambda str: str.encode("utf-8")


def unique_identifier(process_dict: dict, product_dict: dict, type: str) -> str:
    return hashlib.md5(
        _(process_dict["name"])
        + _(product_dict["name"])
        + _(product_dict["unit"])
        + _(process_dict["geography"])
        + _(type)
    ).hexdigest()

In [15]:
from dataclasses import dataclass

@dataclass
class TechnosphereEdge:
    source: str  # Our unique identifier
    target: str  # Our unique identifier
    amount: float
    positive: bool = True

@dataclass
class EcosphereEdge:
    flow: str     # ecoinvent UUID
    process: str  # Our unique identifier
    amount: float

In [16]:
import pyecospold
from pyecospold.model_v2 import IntermediateExchange, Activity, FlowData
from tqdm import tqdm

In [17]:
process_nodes, product_nodes = {}, {}
technosphere_edges, ecosphere_edges = [], []
INPUTS = ("Materials/Fuels", "Electricity/Heat", "Services", "From Technosphere (unspecified)")


def get_process_id(edge: IntermediateExchange, activity: Activity) -> str:
    return edge.activityLinkId or activity.id


def reference_product(flows: FlowData) -> str:
    candidates = [
        edge for edge in flows.intermediateExchanges
        if edge.groupStr == "ReferenceProduct"
        and edge.amount != 0
    ]
    if not len(candidates) == 1:
        raise ValueError("Can't find reference product")
    return candidates[0].intermediateExchangeId

In [18]:
for filepath in tqdm((RELEASE_PATH / "datasets").iterdir()):
    if not filepath.name.endswith(".spold"):
        continue
    ecospold = pyecospold.parse_file_v2(filepath)
    activity = ecospold.activityDataset.activityDescription.activity[0]
    
    this_process = activity_mapping[activity.id]
    this_product = product_mapping[reference_product(ecospold.activityDataset.flowData)]

    this_process_id = unique_identifier(this_process, this_product, "process")
    this_product_id = unique_identifier(this_process, this_product, "product")

    process_nodes[this_process_id] = (this_process, this_product)
    product_nodes[this_product_id] = (this_process, this_product)
    
    for edge in ecospold.activityDataset.flowData.intermediateExchanges:
        other_process = activity_mapping[get_process_id(edge=edge, activity=activity)]
        other_product = product_mapping[edge.intermediateExchangeId]
        other_product_id = unique_identifier(other_process, other_product, "product")
        
        is_input_edge = edge.groupStr in INPUTS
        if is_input_edge:
            technosphere_edges.append(TechnosphereEdge(
                source=other_product_id,
                target=this_process_id,
                amount=edge.amount,
                positive=False
            ))
        else:
            technosphere_edges.append(TechnosphereEdge(
                source=this_process_id,
                target=other_product_id,
                amount=edge.amount,
                positive=True
            ))

    for edge in ecospold.activityDataset.flowData.elementaryExchanges:
        ecosphere_edges.append(EcosphereEdge(
            flow=edge.elementaryExchangeId,
            process=this_process_id,
            amount=edge.amount
        ))

19271it [02:12, 145.85it/s]


In [19]:
import pandas

characterization_factors = pandas.read_excel(
    LCIA_PATH / "LCIA_implementation_3.7.1.xlsx", sheet_name="CFs"
)
characterization_units = pandas.read_excel(
    LCIA_PATH / "LCIA_implementation_3.7.1.xlsx", sheet_name="units"
)

In [20]:
@dataclass
class CharacterizationFactor:
    flow: str
    amount: float


lcia_reverse_mapping = {
    (v['name'],) + tuple(v["compartments"]): k
    for k, v in ecosphere_flows_mapping.items()
}

impact_categories = {
    tuple(obj[:3]): {
        'cfs': []
    }
    for obj in characterization_factors.values.tolist()
}

for obj in characterization_factors.values.tolist():
    impact_categories[tuple(obj[:3])]['cfs'].append(
        CharacterizationFactor(
            flow=lcia_reverse_mapping[tuple(obj[3:6])],
            amount=obj[6]
        )
    )

for obj in characterization_factors.values.tolist():
    impact_categories[tuple(obj[:3])]['unit'] = obj[3]