In [1]:
import randonneur_data as rd
import randonneur as rn
from flowmapper.extraction.ecospold2 import remove_conflicting_synonyms, reformat
from pathlib import Path
import xmltodict
import structlog
import logging
from pathlib import Path
from tqdm import tqdm
from copy import deepcopy
from collections import defaultdict

In [2]:
logging.config.dictConfig({
    "version": 1,
    "disable_existing_loggers": False,
    "handlers": {
        "file": {
            "level": "DEBUG",
            "class": "logging.handlers.WatchedFileHandler",
            "filename": "test.log",
        },
    },
    "loggers": {
        "": {
            "handlers": ["file"],
            "level": "DEBUG",
            "propagate": True,
        },
    }
})
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer(),
        structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)
logger = structlog.get_logger("ecoinvent-migrate")

In [3]:
registry = rd.Registry()

In [4]:
def get_elem_flow_data(version: str) -> list[dict]:
    path = Path(f"/Users/cmutel/Library/Application Support/EcoinventInterface/cache/ecoinvent {version}_cutoff_ecoSpold02/MasterData/ElementaryExchanges.xml")

    if not path.is_file():
        path = Path(f"/Users/cmutel/Library/Application Support/EcoinventInterface/cache/ecoinvent {version}_cut-off_ecoSpold02/MasterData/ElementaryExchanges.xml")
    
    with open(path) as fs:
        ei_xml = xmltodict.parse(fs.read(), strip_whitespace=False)[
            "validElementaryExchanges"
        ]["elementaryExchange"]

    data = remove_conflicting_synonyms([reformat(obj) for obj in ei_xml])

    for obj in data:
        if "formula" in obj:
            del obj["formula"]
        if "cas_number" in obj and not obj["cas_number"]:
            del obj["cas_number"]

    return data

In [5]:
def get_elem_flow_dict(version: str) -> dict:
    return {row['identifier']: row for row in get_elem_flow_data(version)}

In [6]:
def add_comment(comment: str | None, addition: str, deletions: list[str] = ["replaced"]) -> str:
    if comment is None:
        comment = ""
    
    for deletion in deletions:
        if comment == deletion:
            comment = ""

    if comment and not comment.endswith("."):
        comment += "."

    if comment:
        return comment + " " + addition
    else:
        return addition

In [7]:
def generate_changes(
    source_version: str, 
    target_version: str
) -> list[dict]:
    """Use the `uuid` in the change list to add in other attributes not given in change list."""
    source_flow_dict = get_elem_flow_dict(source_version)
    target_flow_dict = get_elem_flow_dict(target_version)

    changes = []

    for s_key, s_data in source_flow_dict.items():
        if s_key not in target_flow_dict:
            logger.debug("Elementary flow deleted: %s", s_data)
            continue

        t_data = target_flow_dict[s_key]
        
        if t_data == s_data:
            continue

        attributes = ", ".join([
            key 
            for key, value in t_data.items() 
            if key in s_data 
            and s_data[key] != value
        ])
        change = {
            "source": s_data,
            "target": t_data,
            "comment": f"Changed {attributes} from {source_version} to {target_version}.",
            "source_version": f"ecoinvent-{source_version}-biosphere",
            "target_version": f"ecoinvent-{target_version}-biosphere"
        }
        changes.append(change)

    return changes    

In [8]:
def supplement_changes(
    changes: list[dict], 
    source_version: str, 
    target_version: str
) -> list[dict]:
    """Use the `uuid` in the change list to add in other attributes not given in change list."""
    source_flow_dict = get_elem_flow_dict(source_version)
    target_flow_dict = get_elem_flow_dict(target_version)
    
    for change in changes:
        if "formula" in change["source"]:
            del change["source"]["formula"]
        if "formula" in change["target"]:
            del change["target"]["formula"]

        
        change['source'].update(source_flow_dict[change['source']['uuid']])
        del change['source']['uuid']
        change['target'].update(target_flow_dict[change['target']['uuid']])
        del change['target']['uuid']

        attributes = ", ".join([
            key 
            for key, value in change['target'].items() 
            if key in change['source'] 
            and change['source'][key] != value
        ])
        comment = add_comment(
            change.get("comment"),
            f"Changed {attributes} from {source_version} to {target_version}."
        )
        change['comment'] = comment
        change["source_version"] = f"ecoinvent-{source_version}-biosphere"
        change["target_version"] = f"ecoinvent-{target_version}-biosphere"

    return changes

In [9]:
def get_filtered_rd_changes(from_v: str, to_v: str) -> list[dict]:
    """Return a filtered list of biosphere changes where the name or uuid changed"""
    raw = registry.get_file(f'ecoinvent-{from_v}-biosphere-ecoinvent-{to_v}-biosphere')
    if 'replace' in raw:
        data = raw['replace']
    elif 'update' in raw:
        data = raw['update']
    else:
        print("No update changes found")
        return []
    data = [
        obj
        for obj in data
        if 'name' in obj['target']
        or obj['target']['uuid'] != obj['source']['uuid']
    ]
    return data

In [10]:
def remove_only_synonyms_change(data: list[dict]) -> list[dict]:
    good = []

    for line in data:
        source = {k: v for k, v in line['source'].items() if k != "synonyms"}
        target = {k: v for k, v in line['target'].items() if k != "synonyms"}
        if source != target:
            good.append(line)

    return good

In [11]:
def apply_forward_change(data: list[dict], other: list[dict]) -> list[dict]:
    """Apply additional changes to get transitive change set."""
    other_mapping = {obj['source']['identifier']: obj for obj in other}
    
    for obj in data:
        try:
            transitive = other_mapping[obj['target']['identifier']]
            obj['target'] = transitive['target']
            if transitive.get("comment"):
                obj['comment'] = add_comment(obj.get("comment"), addition=transitive["comment"])
            obj["target_version"] = transitive["target_version"]

            if "conversion_factor" in transitive:
                obj["conversion_factor"] = obj.get("conversion_factor", 1.) * transitive["conversion_factor"]
            
            logger.debug("Mapping change: %s", obj)
        except KeyError:
            continue

    input_uuids = {obj['source'].get('identifier', None) for obj in data}
    extra = [obj for obj in other if obj['source']['identifier'] not in input_uuids]
    
    return data + remove_only_synonyms_change(extra)

In [12]:
def generate_transitive_datapackage(data: list[dict], source_id: str, end_target: str) -> rn.Datapackage:
    dp = rn.Datapackage(
        name=f"ecoinvent-{source_id}-biosphere-ecoinvent-{end_target}-biosphere-transitive",
        source_id=f"ecoinvent-{source_id}-biosphere",
        target_id=f"ecoinvent-{end_target}-biosphere",
        description=f"Transitive ecoinvent elementary flow correspondence from {source_id} to {end_target}",
        contributors=[{"title": "Chris Mutel", "roles": ["author"], "path": "https://chris.mutel.org"}],
        mapping_source=rn.MappingConstants.ECOSPOLD2_BIO_FLOWMAPPER,
        mapping_target=rn.MappingConstants.ECOSPOLD2_BIO_FLOWMAPPER,
        version="1.0",
    )
    dp.add_data(verb="update", data=data)
    filename = f"ecoinvent-{source_id}-biosphere-ecoinvent-{end_target}-biosphere-transitive.json"
    dp.to_json(filename)
    registry.add_file(filename, replace=True)
    return dp

In [13]:
def generate_datapackage(data: list[dict], source_id: str, target_id: str) -> rn.Datapackage:
    dp = rn.Datapackage(
        name=f"ecoinvent-{source_id}-biosphere-ecoinvent-{target_id}-biosphere",
        source_id=f"ecoinvent-{source_id}-biosphere",
        target_id=f"ecoinvent-{target_id}-biosphere",
        description=f"ecoinvent elementary flow correspondence from {source_id} to {target_id}",
        contributors=[{"title": "Chris Mutel", "roles": ["author"], "path": "https://chris.mutel.org"}],
        mapping_source=rn.MappingConstants.ECOSPOLD2_BIO_FLOWMAPPER,
        mapping_target=rn.MappingConstants.ECOSPOLD2_BIO_FLOWMAPPER,
        version="1.0",
    )
    dp.add_data(verb="update", data=data)
    filename = f"ecoinvent-{source_id}-biosphere-ecoinvent-{target_id}-biosphere.json"
    dp.to_json(filename)
    registry.add_file(filename, replace=True)
    return dp

In [14]:
sorted(list(registry))

['Flowmapper-standard-units-harmonization',
 'SimaPro-2025-ecoinvent-3.12-context',
 'SimaPro-9-ecoinvent-3.8-biosphere',
 'SimaPro-9-ecoinvent-3.8-biosphere-manual-matches',
 'SimaPro-9-ecoinvent-3.9-biosphere',
 'SimaPro-9-ecoinvent-3.9-biosphere-manual-matches',
 'agribalyse-3.1.1-biosphere-ecoinvent-3.8-biosphere',
 'agribalyse-3.1.1-delete-aggregated-ecoinvent',
 'agribalyse-3.1.1-ecoinvent-3.10-biosphere-manual-matches',
 'agribalyse-3.1.1-restore-simapro-ecoinvent-names',
 'agrifootprint-2022-delete-aggregated-ecoinvent',
 'agrifootprint-2022-ecoinvent-3.10-biosphere',
 'agrifootprint-2022-ecoinvent-3.8-biosphere',
 'agrifootprint-2022-restore-simapro-ecoinvent-names',
 'ecoinvent-2.2-biosphere-context-ecoinvent-3.0-biosphere-context',
 'ecoinvent-2.2-biosphere-ecoinvent-3.0-biosphere',
 'ecoinvent-2.2-biosphere-ecoinvent-3.12-biosphere-transitive',
 'ecoinvent-3.01-biosphere-ecoinvent-3.1-biosphere',
 'ecoinvent-3.01-biosphere-ecoinvent-3.12-biosphere-transitive',
 'ecoinvent-3

In [15]:
previous = None
config = [
    {
        "rd_source": "3.10.1",
        "ei_source": "3.10.1",
        "rd_target": "3.11",
        "ei_target": "3.11",
        "supplement": True,
    },
    {
        "rd_source": "3.9.1",
        "ei_source": "3.9.1",
        "rd_target": "3.10",
        "ei_target": "3.10.1",
        "supplement": True,
    },
    {
        "rd_source": "3.8",
        "ei_source": "3.8",
        "rd_target": "3.9",
        "ei_target": "3.9.1",
        "supplement": True,
    },
    {
        "ei_source": "3.7",
        "ei_target": "3.8",
    },
    {
        "ei_source": "3.6",
        "ei_target": "3.7",
    },
    {
        "ei_source": "3.5",
        "ei_target": "3.6",
    },
    {
        "ei_source": "3.4",
        "ei_target": "3.5",
    },
    {
        "ei_source": "3.3",
        "ei_target": "3.4",
    },
    {
        "ei_source": "3.2",
        "ei_target": "3.3",
    },
    {
        "ei_source": "3.1",
        "ei_target": "3.2",
    },
    {
        "ei_source": "3.01",
        "ei_target": "3.1",
    },
]
end_target = "3.12"

In [16]:
for line in tqdm(config):
    if line.get("supplement"):
        data = supplement_changes(
            get_filtered_rd_changes(line["rd_source"], line["rd_target"]), 
            line["ei_source"], 
            line["ei_target"],
        )
    else:
        data = generate_changes(line["ei_source"], line["ei_target"])

    if not line.get("supplement") and data:
        generate_datapackage(deepcopy(data), line["ei_source"], line["ei_target"])
    
    if previous is not None:
        data = apply_forward_change(data, previous)

    generate_transitive_datapackage(deepcopy(data), line["ei_source"], end_target)
    previous = data

100%|█████████████████████████████████████████████████████████████████████| 11/11 [00:06<00:00,  1.78it/s]


In [17]:
data_22 = registry.get_file('ecoinvent-2.2-biosphere-ecoinvent-3.0-biosphere')['replace']

data_301 = defaultdict(list)

for obj in get_elem_flow_data("3.01"):
    data_301[obj['name']].append(obj)

In [18]:
changes = []

for line in data_22:
    s_name, t_name = line['source']['name'], line['target']['name']
    if s_name == t_name:
        continue
    for obj in data_301.get(t_name, []):
        source = {
            "name": s_name,
            "context": obj["context"],
        }
        changes.append({
            "source": source,
            "target": obj,
            "comment": "Name change from ecoinvent 2.2 to 3.01",
            "source_version": "ecoinvent-2.2-biosphere",
            "target_version": "ecoinvent-3.01-biosphere",
        })

In [19]:
data = apply_forward_change(changes, previous)

dp = generate_transitive_datapackage(data, "2.2", end_target)