In [2]:
from pathlib import Path
import os

from py_semantic_taxonomy.adapters.routers.validation import MultilingualString
from py_semantic_taxonomy.domain.constants import SKOS
from tqdm import tqdm
import httpx
import structlog
import xmltodict

from pyst_client.simple import *
from pyst_client.simple.client import WriteClient

In [3]:
auth = os.environ["pyst_auth_token"]
assert auth

metadata_39_dir = Path(os.environ["ecoinvent_39_metadata_dir"])
assert metadata_39_dir.is_dir()

metadata_310_dir = Path(os.environ["ecoinvent_310_metadata_dir"])
assert metadata_310_dir.is_dir()

metadata_311_dir = Path(os.environ["ecoinvent_311_metadata_dir"])
assert metadata_311_dir.is_dir()

In [11]:
products_311 = xmltodict.parse(open(metadata_311_dir / "IntermediateExchanges.xml", "rb"))['validIntermediateExchanges']
products_310 = xmltodict.parse(open(metadata_310_dir / "IntermediateExchanges.xml", "rb"))['validIntermediateExchanges']
products_39 = xmltodict.parse(open(metadata_39_dir / "IntermediateExchanges.xml", "rb"))['validIntermediateExchanges']

In [5]:
release = "{}.{}".format(products['@majorRelease'], products['@minorRelease'])
release

'3.11'

In [6]:
# Storing temporarily for this script; use `.set_X` to set permanently
settings.api_key = auth
settings.creation_base_url = f"https://vocab.sentier.dev/products/ecoinvent/{release}/"
settings.server_url = "https://vocab.cauldron.ch"

In [7]:
cs = ConceptScheme.create(
    pref_labels=[f"ecoinvent Products {release}"],
    version=release,
    notations=[f"ei-products-{release}"],
    definitions=["Set of ecoinvent products (intermediate exchanges) extracted from publicly available metadata"],
    creators=[{"@id": "https://ecoinvent.org/"}],
)
cs.save()

[2m2025-06-09 07:07:30[0m [[32m[1minfo     [0m] [1mServer URL https://vocab.cauldron.ch successfully loaded from secrets directory[0m
[2m2025-06-09 07:07:30[0m [[32m[1minfo     [0m] [1mDefault language `en` successfully loaded from secrets directory[0m
[2m2025-06-09 07:07:30[0m [[32m[1minfo     [0m] [1mServer URL `https://vocab.cauldron.ch` is healthy and reachable[0m
[2m2025-06-09 07:07:30[0m [[32m[1minfo     [0m] [1mAPI key successfully loaded from secrets directory[0m
[2m2025-06-09 07:07:30[0m [[32m[1minfo     [0m] [1mCreation base URL successfully loaded from secrets directory[0m


<Response [200 OK]>

In [8]:
def get_multilingual_strings(obj: dict, key: str, secondary_key: str | None = None) -> list[MultilingualString]:
    if key not in obj:
        return []
    elif secondary_key and secondary_key in obj[key]:
        data = obj[key][secondary_key]
    elif secondary_key and secondary_key not in obj[key]:
        return []
    else:
        data = obj[key]
    if isinstance(data, list):
        return [
            MultilingualString(**{"@language": elem['@xml:lang'], "@value": elem['#text']})
            for elem in data
            if '#text' in elem
        ]
    else:
        assert isinstance(data, dict) and '@xml:lang' in data
        return (
            [MultilingualString(**{"@language": data['@xml:lang'], "@value": data['#text']})] 
            if '#text' in data 
            else []
        )

In [9]:
concepts = {}

In [10]:
for concept in tqdm(products['intermediateExchange']):
    concept_obj = Concept.create(
        concept_scheme=cs,
        pref_labels=get_multilingual_strings(concept, 'name'),
        notations=[concept["@id"]],
        definitions=get_multilingual_strings(concept, 'comment'),
        extra={
            f"{SKOS}scopeNote": get_multilingual_strings(concept, 'productInformation', 'text'),
        },
        id_=f"https://vocab.sentier.dev/products/ecoinvent/{release}/{concept["@id"]}"
    )
    concept_obj.types.append("http://ontology.bonsai.uno/core#FlowObject")
    concept_obj.save()
    concepts[concept["@id"]] = concept_obj

100%|█████████████████████████████████████████| 4309/4309 [06:05<00:00, 11.80it/s]


In [None]:
products['intermediateExchange'][0]

In [None]:
"https%3A//vocab.sentier.dev/products/cpc/2.1/CPCv2.1/871"

In [14]:
cs_310 = ConceptScheme.get_one("https://vocab.sentier.dev/products/ecoinvent/3.10/ei-products-3.10")
cs_311 = ConceptScheme.get_one("https://vocab.sentier.dev/products/ecoinvent/3.11/ei-products-3.11")
cs_39 = ConceptScheme.get_one("https://vocab.sentier.dev/products/ecoinvent/3.9/ei-products-3.9")
cpc = ConceptScheme.get_one("https://vocab.sentier.dev/products/cpc/2.1/")
cn = ConceptScheme.get_one("http://data.europa.eu/xsp/cn2024/cn2024")

In [15]:
correspondence = Correspondence.create(
    compares=[cs_311, cpc],
    pref_labels=["ecoinvent-3.11-CPC-2.1"],
    version="2025.1",
    definitions=[""],
)
correspondence.save()
correspondence.id_

'https://vocab.sentier.dev/products/ecoinvent/3.11/ei-products-3.11-CPCv2.1'

In [43]:
correspondence_e11_cn = Correspondence.create(
    compares=[cs_311, cn],
    pref_labels=["ecoinvent-3.11-CN-2024"],
    version="2025.1",
    definitions=[""],
    id_="https://vocab.sentier.dev/products/correspondence/ei-products-3.11-CN-2024"
)
correspondence_e11_cn.save()
correspondence_e11_cn.id_

'https://vocab.sentier.dev/products/correspondence/ei-products-3.11-CN-2024'

In [47]:
correspondence_e10_cn = Correspondence.create(
    compares=[cs_310, cn],
    pref_labels=["ecoinvent-3.10-CN-2024"],
    version="2025.1",
    definitions=[""],
    id_="https://vocab.sentier.dev/products/correspondence/ei-products-3.10-CN-2024"
)
correspondence_e10_cn.save()
correspondence_e10_cn.id_

'https://vocab.sentier.dev/products/correspondence/ei-products-3.10-CN-2024'

In [36]:
correspondence_310 = Correspondence.create(
    compares=[cs_310, cpc],
    pref_labels=["ecoinvent-3.10-CPC-2.1"],
    version="2025.1",
    definitions=[""],
)
correspondence_310.save()
correspondence_310.id_

'https://vocab.sentier.dev/products/ecoinvent/3.11/ei-products-3.10-CPCv2.1'

In [53]:
correspondence_310_311 = Correspondence.create(
    compares=[cs_310, cs_311],
    pref_labels=["ecoinvent-3.10-ecoinvent-3.11"],
    version="2025.1",
    definitions=[""],
)
correspondence_310_311.save()
correspondence_310_311.id_

'https://vocab.sentier.dev/products/ecoinvent/3.11/ei-products-3.10-ei-products-3.11'

In [23]:
cpc_concepts = Concept.get_many(cpc.id_)
len(cpc_concepts)

4597

In [25]:
cpc_ids = {obj.id_ for obj in cpc_concepts}

In [33]:
cn_concepts = Concept.get_many(cn.id_, timeout=20)
len(cn_concepts)

15033

In [34]:
cn_ids = {obj.id_ for obj in cn_concepts}

In [35]:
cn_concepts[0].id_

'http://data.europa.eu/xsp/cn2024/010011000090'

In [18]:
products_311['intermediateExchange'][1]

{'@id': '6e19db2a-0fe3-45d7-897b-002c8dae678a',
 '@unitId': '487df68b-4994-4027-8fdc-a4dc298257b7',
 'name': {'@xml:lang': 'en',
  '#text': 'residues, MSWI[F]-WWT, WW from black chrome coating'},
 'unitName': {'@xml:lang': 'en', '#text': 'kg'},
 'classification': [{'@classificationId': 'ee3238ec-ca7a-4ddc-af1b-e7c35957b9b6',
   'classificationSystem': {'@xml:lang': 'en',
    '#text': 'By-product classification'},
   'classificationValue': {'@xml:lang': 'en', '#text': 'Waste'}},
  {'@classificationId': '6d3f4523-869c-45b8-a1cc-67ff0e657c80',
   'classificationSystem': {'@xml:lang': 'en', '#text': 'CPC'},
   'classificationValue': {'@xml:lang': 'en',
    '#text': '39320: Ash and residue (except from the manufacture of iron or steel), containing metals or metallic compounds, excep[…]'}},
  {'@classificationId': 'bc50c3a4-f699-5f17-a9fc-040ff213a598',
   'classificationSystem': {'@xml:lang': 'en', '#text': 'HS2017'},
   'classificationValue': {'@xml:lang': 'en',
    '#text': '262110: Slag 

In [37]:
for product in tqdm(products_310['intermediateExchange']):
    try:
        cpc_label = next(obj['classificationValue']['#text'].split(":")[0] for obj in product['classification'] if obj.get("classificationSystem", {}).get("#text") == 'CPC')
    except:
        continue
    cpc_iri = f"https://vocab.sentier.dev/products/cpc/2.1/CPCv2.1/{cpc_label}"
    if cpc_iri in cpc_ids:
        assoc = Association.create(
            correspondence=correspondence_310,
            source_concepts=[
                # Can also be a `Concept` instance
                {"@id": f'https://vocab.sentier.dev/products/ecoinvent/3.10/{product["@id"]}'}
            ], 
            target_concepts=[
                {"@id": cpc_iri}
            ],
        )
        assoc.save()

100%|█████████████████████████████████████████| 4057/4057 [06:24<00:00, 10.55it/s]


In [32]:
for product in tqdm(products_311['intermediateExchange']):
    try:
        cpc_label = next(obj['classificationValue']['#text'].split(":")[0] for obj in product['classification'] if obj.get("classificationSystem", {}).get("#text") == 'CPC')
    except:
        continue
    cpc_iri = f"https://vocab.sentier.dev/products/cpc/2.1/CPCv2.1/{cpc_label}"
    if cpc_iri in cpc_ids:
        assoc = Association.create(
            correspondence=correspondence,
            source_concepts=[
                # Can also be a `Concept` instance
                {"@id": f'https://vocab.sentier.dev/products/ecoinvent/3.11/{product["@id"]}'}
            ], 
            target_concepts=[
                {"@id": cpc_iri}
            ],
        )
        assoc.save()

100%|█████████████████████████████████████████| 4309/4309 [06:39<00:00, 10.79it/s]


In [41]:
[x for x in cn_ids if x.startswith("http://data.europa.eu/xsp/cn2024/080410")]

['http://data.europa.eu/xsp/cn2024/080410000080']

In [48]:
import uuid

def u():
    return uuid.uuid4().hex


for product in tqdm(products_310['intermediateExchange']):
    try:
        cn_label = next(obj['classificationValue']['#text'].split(":")[0] for obj in product['classification'] if obj.get("classificationSystem", {}).get("#text") == 'HS2017')
    except:
        continue
    cn_iris = sorted([x for x in cn_ids if x.startswith(f"http://data.europa.eu/xsp/cn2024/{cn_label}")])
    if not cn_iris:
        continue
    assoc = Association.create(
        correspondence=correspondence_e10_cn,
        source_concepts=[
            {"@id": f'https://vocab.sentier.dev/products/ecoinvent/3.10/{product["@id"]}'}
        ], 
        target_concepts=[
            {"@id": cn_iris[0]}
        ],
        id_=f"https://vocab.sentier.dev/products/correspondence/ei-products-3.10-CN-2024/{u()}"
    )
    assoc.save()

    assoc = Association.create(
        correspondence=correspondence_e10_cn,
        source_concepts=[
            {"@id": cn_iris[0]}
        ], 
        target_concepts=[
            {"@id": f'https://vocab.sentier.dev/products/ecoinvent/3.10/{product["@id"]}'}
        ],
        id_=f"https://vocab.sentier.dev/products/correspondence/ei-products-3.10-CN-2024/{u()}"
    )
    assoc.save()    

100%|█████████████████████████████████████████| 4057/4057 [08:39<00:00,  7.81it/s]


In [None]:
https://vocab.sentier.dev/products/ecoinvent/3.11/ei-products-3.10-CPCv2.1'

In [52]:
ids_311 = {obj["@id"] for obj in products_311['intermediateExchange']}
ids_310 = {obj["@id"] for obj in products_310['intermediateExchange']}

In [55]:
import uuid

def u():
    return uuid.uuid4().hex


for id_311 in tqdm(ids_311):
    if not id_311 in ids_310:
        continue

    assoc = Association.create(
        correspondence=correspondence_310_311,
        source_concepts=[
            {"@id": f'https://vocab.sentier.dev/products/ecoinvent/3.11/{id_311}'}
        ], 
        target_concepts=[
            {"@id": f'https://vocab.sentier.dev/products/ecoinvent/3.10/{id_311}'}
        ],
        id_=f"https://vocab.sentier.dev/products/ecoinvent/3.11/ei-products-3.10-ei-products-3.11/{u()}"
    )
    assoc.save()

    assoc = Association.create(
        correspondence=correspondence_310_311,
        source_concepts=[
            {"@id": f'https://vocab.sentier.dev/products/ecoinvent/3.10/{id_311}'}
        ], 
        target_concepts=[
            {"@id": f'https://vocab.sentier.dev/products/ecoinvent/3.11/{id_311}'}
        ],
        id_=f"https://vocab.sentier.dev/products/ecoinvent/3.11/ei-products-3.10-ei-products-3.11/{u()}"
    )
    assoc.save()


100%|█████████████████████████████████████████| 4309/4309 [16:59<00:00,  4.23it/s]
