In [None]:
from bw2data.utils import recursive_str_to_unicode
from bw_processing.constants import DEFAULT_LICENSES
from lxml import objectify
from numbers import Number
from pathlib import Path
from typing import Optional
import bw2io as bi
import datetime
import json
import numpy as np
import pandas as pd
import uuid

# CAS Numbers

All information from https://www.cas.org/support/documentation/chemical-substances/checkdig

CAS numbers have the for `A`-`B`-`C`, where:

* `A` has between 2 and 7 integers
* `B` has 2 integers
* `C` is a single check digit integer

To calculate the check digit:

Each integer *starting from the right*, and ignoring hyphens, is given a weight corresponding to its ordinal position (1-indexed). The check is calculated from the sum of the weighted values, taking the values in the ones digit. For example for `107-07-3`, the sum would be:

$$ 1 \cdot 7 + 2 \cdot 0 + 3 \cdot 7 + 4 \cdot 0 + 5 \cdot 1 = 33$$

And the check digit would be 3 (the values in the ones position. Similarly, for `110-63-4`:

$$ 1 \cdot 3 + 2 \cdot 6 + 3 \cdot 0 + 4 \cdot 1 + 5 \cdot 1 = 24$$

And indeed we get 4 as the check digit.

In [None]:
reference = pd.read_csv(Path.cwd() / "inputs" / "ecoinventEFv3.7.csv")
reference

It seems like, at least in this reference data, the column `CAS No` is zero-padded (i.e. `A` is the full seven numbers, with zeros added), while `Second CAS` has the leading zeros removed.

In [None]:
reference[reference['Second CAS'] == '96-49-1']

We set up convenience functions for these two forms:

In [None]:
def check_digit(s):
    ERROR = "CAS Check Digit error: CAS '{}' has check digit of {}, but it should be {}"
    
    total = sum((a + 1) * int(b) for a, b in zip(range(9), s.replace("-", "")[-2::-1]))
    if not total % 10 == int(s[-1]):
        raise ValueError(ERROR.format(s, s[-1], total % 10))
                

def check_cas(s):
    if not s:
        return None
    assert s.count("-") == 2
    check_digit(s)
    return True


def zero_pad_cas(s):
    if not s:
        return s
    zeros = "0" * (12- len(s))
    return zeros + s
    
    
def no_padding_cas(s):
    if not s:
        return s
    return s.lstrip("0")

Check our functions:

In [None]:
assert zero_pad_cas('96-49-1') == "0000096-49-1"
assert no_padding_cas("0000096-49-1") == '96-49-1'
assert check_cas('96-49-1')

Should raise an error:

In [None]:
check_cas('96-49-2')

Unfortunately, we also have some bad data (found because we check the check digit). Some of these have been fixed in 3.9, but we will fix it in other files as well.

This is a bit tricky, as ideally we would present the data strictly as given in the input files. However, these are cases where the provided values are simply false, and this could hinder matching, which is the main purpose of generating the elementary flow lists.

In [None]:
fixed_cas_values = {
    '7727-34-7': '7727-43-7',  # Barite; https://pubchem.ncbi.nlm.nih.gov/compound/24414
    '439-94-3': '7439-94-3',   # Lutetium; https://pubchem.ncbi.nlm.nih.gov/compound/23929
    '117-15-3': '107-15-3',    # Ethylenediamine; https://pubchem.ncbi.nlm.nih.gov/compound/3301
    '75-89-5': '74-89-5',      # Methyl amine; https://pubchem.ncbi.nlm.nih.gov/compound/6329
}

In [None]:
def fix_cas(s):
    s = no_padding_cas(s.strip() if s else None)
    return fixed_cas_values.get(s, s)

There are a few attributes which are not provided in the XML, and which we need to retrieve from the reference data. They are:

* `Class`: String, has 12 possible values, like `chemical` or energy
* `Preferred`: Mostly missing, but sometimes `0` or `1`
* `Second CAS`: List of strings, separated in the reference data by `;`

We can make lookup dictionaries using the name and context.

In [None]:
reference['Compound'] = list(zip(reference['Flowable'], reference['Context']))

In [None]:
class_mapping = dict(zip(reference['Compound'], reference['Class']))
preferred_mapping = dict(zip(reference['Compound'], reference['Preferred']))
second_cas_mapping = dict(zip(reference['Compound'], reference['Second CAS']))

Change values in `preferred_mapping` to integers when possible:

In [None]:
preferred_mapping = {k: int(v) if v in {0.0, 1.0} else v for k, v in preferred_mapping.items()}

We need to clean up `second_cas_mapping` to remove duplicate entries and use a single, consistent separator.

In [None]:
def clean_cas_multiple(obj):
    if not obj or (not isinstance(obj, str) and np.isnan(obj)):
        return None
    return {no_padding_cas(s.strip()) for s in obj.split(";")}

In [None]:
second_cas_mapping = {k: clean_cas_multiple(v) for k, v in second_cas_mapping.items()}

Code to read in the XML and format to UNEP format

In [None]:
def extract_flow_data(o, separator="|"):
    cas = fix_cas(o.get("casNumber"))
    key = (
        o.name.text, 
        "/".join((
            o.compartment.compartment.text,
            o.compartment.subcompartment.text,
        ))
    )
    data = {
        "Flowable": o.name.text,
        'CAS No': zero_pad_cas(cas),
        "Formula": o.get("formula"),
        "Synonyms": separator.join(
            syn.text    
            for syn in o.iterchildren()
            if syn.tag == "{http://www.EcoInvent.org/EcoSpold02}synonym"
            and syn.text 
        ),
        "Unit": o.unitName.text,
        "Class": class_mapping.get(key),
        "ExternalReference": None,
        'Preferred': preferred_mapping.get(key),
        "Context": separator.join((
            o.compartment.compartment.text,
            o.compartment.subcompartment.text,
        )),
        "Flow UUID": o.get("id"),
        "AltUnit": None,
        'Second CAS': separator.join(second_cas_mapping.get(key) or "") or None
    }    
    check_cas(cas)
    return data
    
    
def extract_elem_flows_xml(fp, separator="|"):
    if not isinstance(fp, Path):
        fp = Path(fp)
    assert fp.is_file()
    
    root = objectify.parse(open(fp, encoding="utf-8")).getroot()
    flow_data = recursive_str_to_unicode(
        [extract_flow_data(ds, separator=separator) for ds in root.iterchildren()]
    )
    return pd.DataFrame(sorted(flow_data, key=lambda x: (x['Flowable'], x['Context'])))

In [None]:
BASE_DATA_DIR = Path("/Users/chrismutel/Sync/Documents/LCA/Ecoinvent/")

In [None]:
def to_directory_datapackage(
            dataframe: pd.DataFrame,
            dirpath: Path, 
            name: str,  # Should agree with https://github.com/Depart-de-Sentier/happy_family/#short-and-unique-identifiers-for-database-releases
            description: Optional[str] = None,
            id_: Optional[str] = None,
            licenses: Optional[list] = None,
            separator: str = "|",
        ):
    metadata = {
        "profile": "tabular-data-package",  # https://dataprotocols.org/tabular-data-package/
        "name": name,
        "description": description,
        "id": id_ or uuid.uuid4().hex,
        "licenses": licenses or DEFAULT_LICENSES,
        "created": datetime.datetime.utcnow().isoformat("T") + "Z",
        "resources": [{
            "path": f"{name}.csv",
            "profile": "tabular-data-resource",
            "mediatype": "text/csv",
            "separator": "|",
            "schema": {
                "fields": [
                    {'name': 'Flowable', 'type': 'string'},
                    {'name': 'CAS No', 'type': 'string'},
                    {'name': 'Formula', 'type': 'string'},
                    {'name': 'Synonyms', 'type': 'string', 'separated': True},
                    {'name': 'Unit', 'type': 'string'},
                    {'name': 'Class', 'type': 'string'},
                    {'name': 'ExternalReference', 'type': 'string'},
                    {'name': 'Preferred', 'type': 'boolean'},
                    {'name': 'Context', 'type': 'string', 'separated': True},
                    {'name': 'Flow UUID', 'type': 'string'},
                    {'name': 'AltUnit', 'type': 'string', 'separated': True},
                    {'name': 'Second CAS', 'type': 'string', 'separated': True},
                ]
            }
        }]
    }
    dirpath.mkdir(exist_ok=True)
    json.dump(metadata, open(dirpath / "metadata.json", "w"), indent=2, ensure_ascii=False)
    dataframe.to_csv(dirpath / f"{name}.csv", index=False)

In [None]:
separator = "|"

for version in ("3.6", "3.7", "3.8", "3.9"):
    name = f"ecoinvent-{version}"

    to_directory_datapackage(
        dataframe=extract_elem_flows_xml(
            fp=BASE_DATA_DIR / version / "cutoff" / "MasterData" / "ElementaryExchanges.xml",
            separator=separator,
        ),
        dirpath=Path.cwd() / "outputs" / name,
        name=name,
        description=f"Elementary flow list as used in release {name}",\
        separator=separator,
    )