In [None]:
%load_ext autoreload
%autoreload 2
%config Completer.use_jedi = False

In [None]:
from IPython.display import clear_output

In [None]:
import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.rcParams['mathtext.fontset'] = 'stix'
mpl.rcParams['font.family'] = 'STIXGeneral'
mpl.rcParams['text.usetex'] = False
plt.rc('xtick', labelsize=12)
plt.rc('ytick', labelsize=12)
plt.rc('axes', labelsize=12)
mpl.rcParams['figure.dpi'] = 300

In [None]:
# might need to install a few of these
import pickle
import re
from pathlib import Path
import numpy as np
from rdkit import Chem
import pubchempy as pcp
import tqdm
import json
import pandas as pd
from PyAstronomy.pyasl import broadGaussFast, equidistantInterpolation
from scipy.signal import find_peaks
from scipy.interpolate import InterpolatedUnivariateSpline
from sklearn.decomposition import PCA

Download the required package for postprocessing XAS.

In [None]:
!curl -L -O https://github.com/matthewcarbone/NSLS-II-ISS-xview/archive/master.zip
!unzip master.zip
!rm master.zip
clear_output()

In [None]:
import sys
sys.path.append("NSLS-II-ISS-xview-main")
from xview.xasproject.xasproject import XASDataSet

Need this numpy encoder for serializing some of the data later on.

In [None]:
class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return json.JSONEncoder.default(self, obj)

# Parse the data

In [None]:
# Not super proud of this code but it does work
def parse_file(fname):
    
    with open(fname, "r") as f:
        lines = [xx.strip() for xx in f.readlines()]
        
    if len(lines) == 0:
        return None, None

    metadata = {"total": []}
    data = []
    for ii, line in enumerate(lines):
        
        if "Edge" in line:
            if "C " not in line and "N " not in line and "O " not in line:
                return None, None
            if "1s" not in line:
                return None, None
            metadata["edge"] = line.strip()
    
        if "Name:" in line:
            metadata["name"] = line.split("Name: ")[1].strip()
            
        if "Alpha Formula:" in line:
            metadata["alpha formula"] = line.split("Alpha Formula:")[1].strip()
            
        if "Formula:" in line:
            metadata["formula"] = line.split("Formula:")[1].strip()

        if "SMILES:" in line:
            metadata["SMILES"] = line.split("SMILES:")[1].strip()

        if line == "":
            ii += 1
            break
        if line[0] == "*":
            metadata["total"].append(line)
            continue
        break
    
    for jj in range(ii, len(lines)):
        data.append(lines[jj])

    if "\t" in data[0]:
        data = [[float(yy.strip()) for yy in xx.split("\t")] for xx in data]
    else:
        data = [[float(yy.replace(",", "").replace("\t", " ")) for yy in xx.split(" ") if yy != ""] for xx in data]

    L = len(data[0])
    new_data = []
    for line in data:
        if len(line) < L:
            line = line + [np.nan] * (L - len(line))
        new_data.append(line)
    
    
    return np.array(new_data), metadata

In [None]:
def parse_all_data(root="experiment/data", require_smiles=False):
    files = Path(root).glob("*.os")
    data = {}
    metadata = {}
    for file in files:
        if "ab-0nokk" in file.stem:
            continue
        d, m = parse_file(file)
        if d is None:
            continue
        if require_smiles and (m.get("SMILES") in [None, "null"]):
            continue
            
        data[file.stem] = d
        metadata[file.stem] = m
    return data, metadata

In [None]:
def name_to_mod_name(name):
    return re.sub(r"([a-z])\-([a-z])", r"\1\2", name.lower().replace("- ", "-").replace(" ", "-"), 0, re.IGNORECASE)

In [None]:
def formula_getter(formula):
    if formula == "":
        return []
    try:
        return pcp.get_compounds(formula, "formula")
    except:  # God I hate myself for doing this
        return []

In [None]:
def get_recs(unique_x_to_compound, name):
    cs = unique_x_to_compound.get(name)
    if cs is None or len(cs) == 0:
        return None
    return cs[0].canonical_smiles

In [None]:
def insert_SMILES_to_file(path, write_to, smiles):
    
    assert path != write_to
    
    with open(path, "r") as f:
        lines = f.readlines()
    
    replace_at = None
    cc = 0
    current_line = lines[cc]
    
    while "*" == current_line[0]:
        # print(current_line.lower())
        if "alpha formula" in current_line.lower():
            replace_at = cc
            break
        cc += 1
        current_line = lines[cc]
        
    if replace_at is None:
        raise ValueError(f"replace_at is None for some reason; check {path}")
    
    lines.insert(cc + 1, f"* SMILES: {smiles}\n")
    
    with open(write_to, "w") as f:
        for l in lines:
            f.write(l)

In [None]:
def replace_SMILES_in_file(path, smiles):
    write_to = path
    
    with open(path, "r") as f:
        lines = f.readlines()
    
    replace_at = None
    cc = 0
    current_line = lines[cc]
    
    while "*" == current_line[0]:
        # print(current_line.lower())
        if "SMILES:" in current_line:
            replace_at = cc
            break
        cc += 1
        current_line = lines[cc]
        
    if replace_at is None:
        raise ValueError(f"replace_at is None for some reason; check {path}")
    
    lines[cc] = f"* SMILES: {smiles}\n"

    with open(write_to, "w") as f:
        for l in lines:
            f.write(l)

# Get information about the structures using pcp

In [None]:
data, metadata = parse_all_data()

In [None]:
# unique_names = set([value["name"] for value in metadata.values()])
# unique_formulas = set([value["formula"].strip() for value in metadata.values()])
# unique_alpha_formulas = set([value["alpha formula"].strip() for value in metadata.values()])

In [None]:
# Only need to run this once thank goodness
# it definitely was not the most efficient way to do it but I'm not going to think about it right now!

# Run this below:
# unique_names_to_compounds = {key: pcp.get_compounds(name_to_mod_name(key), "name") for key in unique_names}
# unique_formulas_to_compound = {formula: formula_getter(formula) for formula in unique_formulas}
# unique_alpha_formulas_to_compound = {formula: formula_getter(formula) for formula in unique_alpha_formulas}
# import pickle
# pickle.dump(unique_names_to_compounds, open("unique_names_to_compounds.pkl", "wb"), protocol=5)
# pickle.dump(unique_formulas_to_compound, open("unique_formulas_to_compounds.pkl", "wb"), protocol=5)
# pickle.dump(unique_alpha_formulas_to_compound, open("unique_alpha_formulas_to_compounds.pkl", "wb"), protocol=5)

In [None]:
unique_formulas_to_compound = pickle.load(open("experiment/unique_formulas_to_compounds.pkl", "rb"))
unique_names_to_compound = pickle.load(open("experiment/unique_names_to_compounds.pkl", "rb"))
unique_alpha_formulas_to_compound = pickle.load(open("experiment/unique_alpha_formulas_to_compounds.pkl", "rb"))

# Manually check all SMILES

Using a combination of [openmolecules.org](https://openmolecules.org/name2structure.html), Wikipedia, public chemistry databases such as pubchem (NIH), etc. 

In [None]:
data, metadata = parse_all_data()

In [None]:
keys = sorted(data.keys())

In [None]:
cache = {}

for ii, k in enumerate(keys):
    d = data[k]
    md = metadata[k]
    
    source = f"experiment/data/{k}.os"
    target = f"experiment/data_with_smiles/{k}.os"
    
    name = md["name"].strip()
    form = md["formula"].strip()
    aform = md["alpha formula"].strip()
    print(f"{ii+1:03}/{len(keys)}: {k} - {name} - {form} - {aform}")
    
    if Path(target).exists():
        print(f"target file exists, continuing!")
        print("-" * 20)
        continue

    smiles = cache.get(name)
    if smiles is not None:
        print(f"{k} - SMILES: {smiles}")
        print(f"SMILES from name {name} already computed ({smiles}), creating file automatically!")
        insert_SMILES_to_file(source, target, smiles)
        print("-" * 20)
        continue
    
    smiles_names = get_recs(unique_names_to_compound, name)        
    smiles_form = get_recs(unique_formulas_to_compound, form)
    smiles_aform = get_recs(unique_alpha_formulas_to_compound, aform)
    
    if smiles_names == smiles_form == smiles_aform:
        print("ALL GUESSES AGREE")
    else:
        if smiles_names == smiles_form and smiles_names is not None:
            print("name and formula agree")
        if smiles_names == smiles_aform and smiles_names is not None:
            print("name and alpha agree")
        if smiles_form == smiles_aform and smiles_form is not None:
            print("both formulas agree")
    
    print(f"GUESSES - {smiles_names} - {smiles_form} - {smiles_aform}")
    user_smiles = input("write correct smiles")
    
    if user_smiles == smiles_names == smiles_form == smiles_aform:
        print("USER PROVIDED AGREES WITH ALL GUESSES")
    else:
        if user_smiles == smiles_names:
            print("user provided and name->SMILES agree")
        if user_smiles == smiles_form:
            print("user provided and formula->SMILES agree")
        if user_smiles == smiles_aform:
            print("user provided and alpha->SMILES agree")
        
        
    
    insert_SMILES_to_file(source, target, user_smiles)
    cache[name] = user_smiles
    print(f"wrote {user_smiles} from {source} -> {target}")
    
    print("-" * 20)

# Triple check

In the first pass, I labeled many of the datapoints as `null` for various reasons. I'm going to go back through now and modify those. Many times, I did this because the compounds appear to have heavy transition metals or the like. Some structures were validated against the [literature itself](https://amo-csd.lbl.gov/downloads/bib-inner-shell-by-hitchcock.pdf).

In [None]:
data, metadata = parse_all_data("experiment/data_with_smiles")

In [None]:
keys = sorted(data.keys())

In [None]:
cc = 0
for md in metadata.values():
    if md["SMILES"] == "null":
        cc+=1

In [None]:
print(f"null: {cc/len(metadata)*100:.2f}%")
print(f"So far, total usable data is {len(metadata)-cc}")

In [None]:
cache = {}

for ii, k in enumerate(keys):
    md = metadata[k]
    
    name = md["name"].strip()
    form = md["formula"].strip()
    aform = md["alpha formula"].strip()
    
    source = f"experiment/data_with_smiles/{k}.os"
    
    if md["SMILES"] != "null":
        continue

    smiles = cache.get(name)
    if smiles is not None:
        print(f"SMILES from name {name} already computed ({smiles}), modifying file automatically!")
        replace_SMILES_in_file(source, smiles)
        continue

    name = md["name"].strip()
    form = md["formula"].strip()
    aform = md["alpha formula"].strip()
    print(f"{ii+1:03}/{len(keys)}: {k} - {name} - {form} - {aform}")

    smiles = input()
    
    replace_SMILES_in_file(source, smiles)
    print(f"wrote {smiles} from {source} -> {target}")
    
    cache[name] = smiles

# Load in the final dataset

In [None]:
raw_data, metadata = parse_all_data("experiment/data_with_smiles", require_smiles=True)

In [None]:
with open("experiment/data_with_smiles_unique_smiles.smi", "r") as f:
    unique_smiles = f.readlines()
unique_smiles = [u.strip() for u in unique_smiles]

Run this on the command line: `obabel -ismi data_with_smiles_unique_smiles.smi -ofpt -xfFP4 -xs > "data_with_smiles_unique_smiles_functional_groups.txt"`, and load in the data (this is largely just from script `00` in this repository).

In [None]:
with open("experiment/data_with_smiles_unique_smiles_functional_groups.txt", "r") as f:
    functional_groups = f.readlines()

# Every other line is just a ">"
functional_groups = functional_groups[1::2]

In [None]:
# Construct a dictionary from this
functional_groups = {
    smile: group.strip().split()
    for smile, group in zip(unique_smiles, functional_groups)
}

Now lets find the unique functional groups.

In [None]:
all_functional_groups_enumerated = [
    g for groups in functional_groups.values() for g in groups
]
all_unique_functional_groups = sorted(
    list(set(all_functional_groups_enumerated))
)

Create the index. This will map SMILES to whether or not C, N or O is in the molecule, and the binary indicator of each functional group which is present in the molecule.

In [None]:
index = {
    "SMILES": [],
    "C": [],
    "N": [],
    "O": [],
}
index = {**index, **{fg: [] for fg in all_unique_functional_groups}}

In [None]:
for smile in unique_smiles:
    
    index["SMILES"].append(smile)

    for key in ["C", "N", "O"]:
        index[key].append(int(key.lower() in smile.lower()))

    for fg in all_unique_functional_groups:
        index[fg].append(int(fg in functional_groups[smile]))

In [None]:
index = pd.DataFrame(index)

In [None]:
# index.to_csv("experiment/index.txt")

The targets are the indicator of whether or not a functional group of that type exists on the molecule. This maps the SMILES of the molecule to the functional groups.

In [None]:
targets = {row["SMILES"]: list(row.to_numpy()[4:]) for _, row in index.iterrows()}

# Define helper function for processing loop

In [None]:
def fill_I(I2, I1, alpha=0.5):
    return 2.0 * I2 - I1 + alpha * (1.0 - I2)

In [None]:
def process(
    grid=None,
    element="C",
    a_norm1=50,
    a_norm2=600,
    debug=None,
    max_mu=25,
    max_mu_after_processing=5,
    max_energy=400,
    metadata=metadata,
    index=index,
    raw_data=raw_data,
    feff_edge_location=277.713567839196,
    alpha=0.05,
):

    # Grid is the empirical guess from experiment
    # By default, we use the FEFF grid as a guess

    feff_grid = np.loadtxt(f"data/{element.lower()}_grid.txt")

    if grid is None:
        grid = feff_grid.copy()

    data = {
        "data": {},
        "grid": grid,
        "feff_grid": feff_grid,
        "errors": {},
        "xview.xasproject_params": {
            "a_norm1": a_norm1,
            "a_norm2": a_norm2,
        }
    }

    targets = {row["SMILES"]: list(row.to_numpy()[4:]) for _, row in index.iterrows()}

    cc = 0
    for key, value in tqdm.tqdm(metadata.items()):
    
        if f" {element} " not in value["edge"]:
            continue
    
        d = raw_data[key]
        smiles = value["SMILES"]
        target = targets[smiles]
        
        try:
            a=XASDataSet(energy=d[:, 0], mu=d[:, 1])
            a.norm1=a_norm1
            a.norm2=a_norm2
            
            a.normalize_force()
            # a.normalize()
    
        except ValueError:
            data["errors"][key] = "XASDataSet"
            continue
    
        mu = np.round(a.flat.squeeze(), 6)
        energy = np.round(a.energy.squeeze(), 6)
        
        energy, mu = equidistantInterpolation(energy, mu, dxmode=0.19)
        
        mu = broadGaussFast(energy, mu, sigma=0.5, edgeHandling="firstlast")
        
        if np.any(mu > max_mu):
            data["errors"][key] = f"Intensity>{max_mu}"
            continue

        if np.any(energy > max_energy):
            data["errors"][key] = f"Energy>{max_energy}"
            continue
    
        spline = InterpolatedUnivariateSpline(energy.squeeze(), mu.squeeze(), k=1, ext=1)
        mu2 = spline(grid).squeeze()
    
        ii = len(mu2) - 1
        while mu2[ii] == 0:
            mu2[ii] = np.nan
            ii -= 1
        
        # peaks = find_peaks(mu, height=0.5)
        # first_peak_index = peaks[0][0]
        # first_peak_energy = energy[first_peak_index]
        # energy -= first_peak_energy
        
        data["data"][key] = {
            "spectrum": mu2,
            "original_spectrum": mu,
            "original_energy": energy,
            "smiles": smiles,
            "target": target,
        }
        
    
        if debug is not None and cc > debug:
            break
    
        cc += 1

    for key, value in tqdm.tqdm(data["data"].items()):
    
        spectrum = value["spectrum"]
        for ii, (e, s) in enumerate(zip(grid, spectrum)):
            if s > 1:
                break
    
        # Magic number here is from the average FEFF spectrum where it crosses 1 the first time
        value["shifted_grid"] = grid - grid[ii] + feff_edge_location
        if np.any(value["shifted_grid"] < -50):
            data["errors"][key] = "ShiftError"
    
        infilled_spectrum = spectrum.copy()
        for ii in range(2, len(infilled_spectrum)):
            if np.isnan(infilled_spectrum[ii]):
                infilled_spectrum[ii] = fill_I(infilled_spectrum[ii - 1], infilled_spectrum[ii - 2], alpha=alpha)
    
        spline = InterpolatedUnivariateSpline(value["shifted_grid"].squeeze(), infilled_spectrum.squeeze(), k=1, ext=3)  # ValueError here
        mu2 = spline(feff_grid).squeeze()

        # Check for any remaining background
        p = np.polyfit(feff_grid[-5:], mu2[-5:], deg=1)
        # print(p)
        slope = p[0]
        yint = p[1]
        line = feff_grid * p[0] + p[1]
        mu2 = mu2 / line

        if np.sum(mu2 < 0) > 50 or np.any(mu2[:5] > 0.5) or np.any(mu2 > max_mu_after_processing):
            data["errors"][key] = "IntensityMagnitudeError"

        
        # Align tail to 1
        # tail_value = mu2[-20:]
        # mu2 = mu2 / tail_value.mean()
        
        value["infilled_spectrum"] = mu2

    return data

# Carbon K-edge

In [None]:
data_C = process(
    grid=np.linspace(280, 340, 200),
    element="C",
    # debug=20,
    a_norm1=60,
    a_norm2=100,
    max_mu=25,
    max_mu_after_processing=5,
    max_energy=400,
    feff_edge_location=277.71
)

Special postprocessing for C since I cannot seem to get the parameters just right.

In [None]:
for key, value in data_C["data"].items():
    if key in data_C["errors"]:
        continue
    spectrum = value["infilled_spectrum"]
    if np.any(spectrum[50:]<0.75):
        data_C["errors"][key] = "IntensityMagnitudeError"
    if np.any(spectrum[-50:]>1.05):
        data_C["errors"][key] = "IntensityMagnitudeError"

In [None]:
with open("experiment/c_exp.json", "w") as f:
    json.dump(data_C, f, cls=NumpyEncoder)

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(3, 2))

for key, value in data_C["data"].items():
    if key in data_C["errors"]:
        continue
    spectrum = value["infilled_spectrum"]
    if np.any(spectrum[50:]<0.75):
        continue
    if np.any(spectrum[-50:]>1.05):
        continue
    grid = data_C["feff_grid"]
    
    ax.plot(grid, spectrum, alpha=1, linewidth=0.5, label=value["smiles"])
    # ax.plot(value["original_energy"], value["original_spectrum"], color="black", alpha=0.5, linewidth=0.5)

# ax.set_xlim(280, 330)
# ax.set_ylim(0, 2)
ax.axhline(1, zorder=-1, linewidth=0.5, linestyle="--", color="black")
# ax.legend(bbox_to_anchor=(1, 1.05))
    
plt.show()

# Nitrogen K-edge

In [None]:
data_N = process(
    grid=np.linspace(380, 430, 200),
    element="N",
    # debug=20,
    a_norm1=50,
    a_norm2=100,
    max_mu=10,
    max_energy=500,
    feff_edge_location=399.34,
    alpha=5
)

In [None]:
with open("experiment/n_exp.json", "w") as f:
    json.dump(data_N, f, cls=NumpyEncoder)

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(3, 2))

for key, value in data_N["data"].items():
    if key in data_N["errors"]:
        continue
    spectrum = value["original_spectrum"]
    grid = data_N["feff_grid"]
    ax.plot(grid, spectrum, alpha=1, linewidth=0.5, label=value["smiles"])
    # ax.plot(value["original_energy"], value["original_spectrum"], color="black", alpha=0.5, linewidth=0.5)

# ax.set_xlim(280, 330)
# ax.set_ylim(0, 2)
ax.axhline(1, zorder=-1, linewidth=0.5, linestyle="--", color="black")
# ax.legend(bbox_to_anchor=(1, 1.05))
    
plt.show()

# Oxygen K-edge

In [None]:
data_O = process(
    grid=np.linspace(440, 580, 200),
    element="O",
    # debug=20,
    a_norm1=50,
    a_norm2=100,
    max_mu=10,
    max_energy=700,
    feff_edge_location=530.17,
    alpha=0.2
)

In [None]:
with open("experiment/o_exp.json", "w") as f:
    json.dump(data_O, f, cls=NumpyEncoder)

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(3, 2))

for key, value in data_O["data"].items():
    if key in data_O["errors"]:
        continue
    spectrum = value["infilled_spectrum"]
    grid = data_O["feff_grid"]
    # grid = value["original_energy"]
    ax.plot(grid, spectrum, alpha=1, linewidth=0.5, label=value["smiles"])
    # ax.plot(value["original_energy"], value["original_spectrum"], color="black", alpha=0.5, linewidth=0.5)

# ax.set_xlim(280, 330)
# ax.set_ylim(0, 2)
ax.axhline(1, zorder=-1, linewidth=0.5, linestyle="--", color="black")
# ax.legend(bbox_to_anchor=(1, 1.05))
    
plt.show()