# Match missing by validated CAS numbers

Validate and then use CAS numbers for matching where possible.

In [15]:
import pandas as pd
from pathlib import Path
from datetime import datetime, timezone
import numpy as np
from numbers import Number

In [3]:
unmatched_data_dir = (Path.cwd().parent / "Mapping" / "Output" / "Unmatched").resolve()
output_dir = (Path.cwd().parent / "Contribute").resolve()

Read input dataframes

In [4]:
sp = pd.read_csv(unmatched_data_dir / 'SimaProv9.4.csv')

In [5]:
ei = pd.read_csv(unmatched_data_dir / 'ecoinventEFv3.7.csv')

## Validating CAS numbers

Just because we have them doesn't make them correct.

Based on code from [happy_family](https://github.com/Depart-de-Sentier/happy_family/blob/main/Elementary%20flow%20lists/Generate%20elementary%20flow%20lists.ipynb).

In [29]:
def validate_cas(s):
    ERROR = "CAS Check Digit error: CAS '{}' has check digit of {}, but it should be {}"
    
    if isinstance(s, str):
        s = s.strip()
    if not s:
        return None
    elif isinstance(s, Number) and np.isnan(s):
        return None
    
    total = sum((a + 1) * int(b) for a, b in zip(range(9), s.replace("-", "")[-2::-1]))
    if not total % 10 == int(s[-1]):
        print("CAS not valid: {} ({})".format(s, ERROR.format(s, s[-1], total % 10)))
        return None
    return s
                

def check_cas(s):
    if not s:
        return None
    assert s.count("-") == 2
    check_digit(s)
    return True


def zero_pad_cas(s):
    if not s:
        return s
    zeros = "0" * (12- len(s))
    return zeros + s
    
    
def no_padding_cas(s):
    if not s:
        return s
    return s.lstrip("0")

In [32]:
ei["Valid CAS"] = ei["CASNo"].apply(validate_cas)
ei = ei[ei["Valid CAS"] != None]

In [33]:
sp["Valid CAS"] = sp["CAS No"].apply(validate_cas)
sp = sp[sp["Valid CAS"] != None]

Example of how to combine dataframes using [merge](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.merge.html). We already have these matches, this is only an example :)

In [34]:
df = sp.merge(ei, how="inner", on=["Valid CAS", "Context"])
len(df)

140547

Adjust columns to match expected format:

In [None]:
def fix_names_after_merge(df):
    mapping = {
        'Flow UUID': 'SourceFlowUUID', 
        'FlowUUID': 'TargetFlowUUID',  # Incorrect column header in provided ecoinvent data
        'Flowable_x': 'SourceFlowName', 
        'Flowable_y': 'TargetFlowName',
        'Unit_x': 'SourceUnit',
        'Unit_y': 'TargetUnit',
        'Context_x': 'SourceFlowContext',
        'Context_y': 'TargetFlowContext',
    }
    return df.rename(columns={k: v for k, v in mapping.items() if k in df.columns})

Add some useful columns.

* `author` is your name
* `notebook_name` is the name of this notebook; we can't figure this out automatically. It should normally start with `Match -`.
* `default_match_condition` is one of `=`, `~`, `<`, or `>`.

In [None]:
def add_common_columns(df, author, notebook_name, default_match_condition="="):
    df['SourceListName'] = 'SimaPro9.4'
    df['TargetListName'] = 'ecoinventEFv3.7'
    df['MatchCondition'] = default_match_condition
    df['Mapper'] = author
    df['MemoMapper'] = f'Automated match. Notebook: {notebook_name}'
    df['MemoSource'] = ''
    df['MemoTarget'] = ''
    df['MemoVerifier'] = ''
    df['LastUpdated'] = datetime.now(timezone.utc).astimezone().isoformat()
    df['Verifier'] = ''
    return df

Make sure the required columns are present

In [None]:
def check_required_columns(df):
    expected = set([     
        "SourceListName", "SourceFlowName", "SourceFlowUUID", "SourceFlowContext", "SourceUnit", 
        "MatchCondition", "TargetListName", "TargetFlowName", "TargetFlowUUID", 
        "TargetFlowContext", "TargetUnit", "Mapper", "Verifier", "LastUpdated", "MemoMapper", 
        "MemoVerifier", "MemoSource", "MemoTarget"
    ])
    given = set(df.columns)
    difference = expected.difference(given)
    if difference:
        print("Missing the following required columns:", difference)

Export the dataframe to the `contribute` directory. Please make your filename meaningful.

In [None]:
def export_dataframe(df, name):
    SPEC_COLUMNS = [
        "SourceListName", "SourceFlowName", "SourceFlowUUID", "SourceFlowContext", "SourceUnit", 
        "MatchCondition", "ConversionFactor", "TargetListName", "TargetFlowName", "TargetFlowUUID", 
        "TargetFlowContext", "TargetUnit", "Mapper", "Verifier", "LastUpdated", "MemoMapper", 
        "MemoVerifier", "MemoSource", "MemoTarget"
    ]
    
    df = df[[col for col in SPEC_COLUMNS if col in df.columns]]
    
    if not name.lower().endswith(".csv"):
        name += ".csv"
    
    df.to_csv(output_dir / name, index=False)