In [1]:
import pandas as pd
import numpy as np
import json
from utils.classes import *
from utils.exclude import *
import re
from collections import defaultdict

filename = r'data/solid-state_dataset_20200713.json'
filedata = open(filename, mode='r').read()
jsonParse = json.loads(filedata)

reactions = [from_dict(reaction, ReactionEntry) for reaction in jsonParse['reactions']]

In [2]:
def RemoveBadEntries(reactions: list,
                        min_precursors = 2,
                        remove_bad_doi = True,
                        remove_bad_target = True,
                        remove_bad_precursor = True, 
                        remove_duplicates_via_doi = True,
                        remove_invalid_coefficients_multiplicities = True,
                        use_bad_list = True,
                        remove_negative_coefficients = True, 
                        verbose_output = True) -> List[ReactionEntry]:
    
    """
    Filters out bad reaction entries from a given list based on various criteria.
    
    Parameters:
    reactions (list): List of ReactionEntry objects.
    min_precursors (int): Minimum number of precursors required. Default is 2.
    remove_bad_doi (bool): Flag to remove entries with bad DOIs. Default is True.
    remove_bad_target (bool): Flag to remove entries with bad targets. Default is True.
    remove_bad_precursor (bool): Flag to remove entries with bad precursors. Default is True.
    remove_duplicates_via_doi (bool): Flag to remove duplicate entries via DOI. Default is True.
    remove_invalid_coefficients_multiplicities (bool): Flag to remove entries with invalid coefficients or multiplicities. Default is True.
    use_bad_list (bool): Flag to use a predefined list of bad entries. Default is True.
    remove_negative_coefficients (bool): Flag to remove entries with negative coefficients. Default is True.
    verbose_output (bool): Flag to enable verbose output. Default is True.
    
    Returns:
    list: A list of filtered ReactionEntry objects.
    """
        
    filtered_reactions = []
    bad_list = ['*', '-', 'x', '+', '/', 'ac', '(2N)', '(3N)', '(4N)', '(5N)', '(6N)', '7LiOH', '2Ni(OH)2']
    isDigitRegex = re.compile(r'^-?\d+(\.\d+)?$')
    isNegativeRegex = re.compile(r'^(0*[1-9]\d*|0*\d*\.\d*[1-9])$')
    RegexSelected = isNegativeRegex
    if not remove_negative_coefficients:
        RegexSelected = isDigitRegex
    for reaction in reactions:
        rxn: ReactionEntry = reaction
        if (verbose_output): print(rxn.reaction_string, end='')
        if remove_bad_doi and rxn.doi in BAD_DOI: 
            if (verbose_output): print(": REJECTED DUE TO BAD DOI")
            continue
        if len(rxn.precursors) < min_precursors: 
            if (verbose_output): print(": REJECTED DUE TO LOW PRECURSOR COUNT")
            continue
        if remove_bad_target and any(target in BAD_TARGETS for target in (rxn.targets_string)):  
            if (verbose_output): print(": REJECTED DUE TO BAD TARGET")
            continue
        if remove_bad_precursor and  any(precursor.material_formula in BAD_PRECURSORS for precursor in rxn.precursors):  
            if (verbose_output): print(": REJECTED DUE TO BAD PRECURSOR")
            continue
        # if any([not bool(isDigitRegex.match(s.amount)) for s in rxn.reaction.left_side]):
        #     if (verbose_output): print(": REJECTED DUE TO UNKNOWN COEFFICIENT IN LHS")
        #     continue
        # if any([not bool(isDigitRegex.match(s.amount)) for s in rxn.reaction.right_side]):
        #     if (verbose_output): print(": REJECTED DUE TO UNKNOWN COEFFICIENT IN RHS")
        #     continue 
        if remove_invalid_coefficients_multiplicities and any([not bool(RegexSelected.match(s.amount)) for s in rxn.reaction.left_side]):
            if (verbose_output): print(": REJECTED DUE TO INVALID COEFFICIENT IN LHS")
            continue
        if remove_invalid_coefficients_multiplicities and any([not bool(RegexSelected.match(s.amount)) for s in rxn.reaction.right_side]):
            if (verbose_output): print(": REJECTED DUE TO INVALID COEFFICIENT IN RHS")
            continue

        found_bad = False
        if use_bad_list: 
            for bad in bad_list:
                if(any(bad in target_string for target_string in rxn.targets_string)) \
                or any(bad in precursor.material_formula for precursor in rxn.precursors):
                    found_bad = True
        
        if found_bad:  
            if (verbose_output): print(": REJECTED CHARACTER FROM BAD LIST")
            continue
        else:
            if (verbose_output): print(": SELECTED") 
            filtered_reactions.append(rxn)
    print("Filtered", len(filtered_reactions), "reactions out of total", len(reactions))
    return filtered_reactions

def NormalizePrecursors(reactions: list) -> List[ReactionEntry]:
    
    """
    Normalizes precursor materials in the given list of reactions based on predefined replacements.
    
    Parameters:
    reactions (list): List of ReactionEntry objects.
    
    Returns:
    list: A list of ReactionEntry objects with normalized precursors.
    """

    PrecursorMaterialReplacements = {}
    for key, value in PREC_REPLACEMENTS.items():
        PrecursorKey    = [ material for reaction in reactions for material in reaction.precursors if material.material_formula == key]
        PrecursorValue  = [ material for reaction in reactions for material in reaction.precursors if material.material_formula == value]
        filtered_reactions = [reaction for reaction in reactions for material in reaction.precursors if key in material.material_formula]
        number_replacements = len(filtered_reactions)
        if(len(PrecursorKey) > 0):
            for rxn in filtered_reactions:
                for prec in rxn.precursors:
                    if prec.material_formula == PrecursorKey[0].material_formula: 
                        #print("replace here")
                        prec = PrecursorValue[0]
                    if PrecursorKey[0].material_formula in rxn.reaction_string:
                        rxn.reaction_string.replace(PrecursorKey[0].material_formula, PrecursorValue[0].material_formula)
                    # TODO: You still have to replace Formula parts in rxn.reaction. Find a proposal that works.
            PrecursorMaterialReplacements[PrecursorKey[0]] = PrecursorValue[0]
            print("Processed:", key, '=', value, ": replaced", number_replacements, " places")
        else: print("Skipped:", key)
    return reactions

def RemoveDuplicates(reactions: list) -> List[ReactionEntry]:
    
    """
    Placeholder for function to remove duplicates from the list of reactions.
    TODO: Integrate all other "Remove*Duplicates" Functions

    Parameters:
    reactions (list): List of ReactionEntry objects.
    
    Returns:
    list: A list of ReactionEntry objects without duplicates.
    """

    return reactions

def RemoveDOIDuplicates(reactions: list) -> List[ReactionEntry]:
    
    """
    Removes duplicate reactions based on DOI and reaction string.
    
    Parameters:
    reactions (list): List of ReactionEntry objects.
    
    Returns:
    list: A list of ReactionEntry objects without DOI duplicates.
    """

    # Assuming 'reactions' is your list of ReactionEntry objects

    # Create a defaultdict to store entries grouped by (doi, reaction_string)
    entry_dict = defaultdict(list)
    for entry in reactions:
        entry_dict[(entry.doi, entry.reaction_string)].append(entry)

    # Filter out entries where there are duplicates (keep only the first occurrence)
    filtered_reactions_doi = []
    seen_keys = set()
    for entry in reactions:
        key = (entry.doi, entry.reaction_string)
        if key not in seen_keys:
            seen_keys.add(key)
            filtered_reactions_doi.append(entry)
    print("Filtered", len(filtered_reactions_doi), "reactions out of total", len(reactions))
    return filtered_reactions_doi

def RemoveNodeMatchDuplicates(reactions: list, verbose_output=False) -> tuple[List[ReactionEntry], defaultdict(list)]:
    
    """
    Removes duplicates based on node matches in the reaction entries.
    
    Parameters:
    reactions (list): List of ReactionEntry objects.
    
    Returns:
    list: A list of ReactionEntry objects without node match duplicates.
    """
    # Dictionary to store entries grouped by (right_side_tuple, amount_tuple)
    duplicate_entries = defaultdict(list)
    filtered_reactions_dupForm = []
    for entry in reactions:
        # Create a tuple representation of right_side
        right_side_tuple = tuple((part.amount, part.material) for part in entry.reaction.right_side)

        # Create sets of (amount, material) tuples for target and precursors
        target_materials = {(comp.amount, comp.formula) for comp in entry.target.composition}
        precursor_materials =  {(mat.amount, mat.formula) for composition in (material.composition for material in entry.precursors) for mat in composition}

        # Create a tuple for (target materials, precursor materials)
        materials_tuple = (frozenset(target_materials), frozenset(precursor_materials))

        key = (right_side_tuple, materials_tuple)
        duplicate_entries[key].append(entry)

    # Now, find and print duplicates
    seen_keys = set()
    for key, entries in duplicate_entries.items():
        if len(entries) > 1:
            if key not in seen_keys:
                seen_keys.add(key)
                filtered_reactions_dupForm.append(entry)
            if(verbose_output): print(len(entries), "\tDuplicates for: ", end='')
            if(verbose_output): print(f"Right Side: {key[0]}", end='')
            if(verbose_output): print("Target materials:", end='')
            if(verbose_output): print(key[1][0], end='')
            if(verbose_output): print("Precursor materials:", end='')
            if(verbose_output): print(key[1][1], end='')
            if(verbose_output): print()
            for i in range(len(entries)):
                entry = entries[i]
                if(verbose_output): print("Entry #{}: ".format(i), end='')
                calc_operations = [op for op in entry.operations if "calc" in op.token]
                if(verbose_output): print("No. Calcination Operations: {}, ".format(len(calc_operations)))
                for op in calc_operations:
                    if(verbose_output): print(op)
    print("Filtered", len(filtered_reactions_dupForm), "reactions out of total", len(reactions))
    return filtered_reactions_dupForm, duplicate_entries


In [3]:
badEntriesList = RemoveBadEntries(reactions, verbose_output = False)
doiEntriesList = RemoveDOIDuplicates(badEntriesList)

Filtered 20093 reactions out of total 31782
Filtered 19318 reactions out of total 20093


In [3]:
filtered_final, duplicate_node_match = RemoveNodeMatchDuplicates(doiEntriesList)

NameError: name 'doiEntriesList' is not defined

In [None]:
def split_reaction(reaction_string):
    
    """
    Splits a chemical reaction string into reactants and products.

    Parameters:
    reaction_string (str): The reaction string to be split, with reactants and products separated by '=='.

    Returns:
    Tuple[str, str]: A tuple containing the reactants and products as separate strings.
    """
        
    reactants, products = reaction_string.split('==')
    reactants = reactants.strip()
    products = products.strip()
    return reactants, products

def parse_chemical_formula(formula):

    """
    Parses a chemical formula into its constituent elements and their multiplicities.

    Parameters:
    formula (str): The chemical formula to be parsed.

    Returns:
    List[Tuple[str, str, str]]: A list of tuples, each containing an element, its multiplicity, and its atomic number.
    """

    pattern = r'([A-Z][a-z]*)(\d*\.?\d*)'
    matches = re.findall(pattern, formula)
    element_details = []
    for match in matches:
        element, multiplicity = match
        multiplicity = multiplicity if multiplicity else '1'
        atomic_number = re.findall(r'\d+', multiplicity)
        atomic_number = atomic_number[0] if atomic_number else '1'
        element_details.append((element, multiplicity, atomic_number))
    return element_details

def extract_element_details(reaction):

    """
    Extracts element details from a chemical reaction string.

    Parameters:
    reaction (str): The reaction string containing elements separated by '+'.

    Returns:
    List[Tuple[str, str, str]]: A list of tuples containing element details.
    """

    parts = reaction.split('+')
    element_details = []
    for part in parts:
        part = part.strip()
        element_details.extend(parse_chemical_formula(part))
    return element_details

def expand_element_details(element_details, prefix):

    """
    Expands element details into a dictionary with a given prefix.

    Parameters:
    element_details (List[Tuple[str, str, str]]): A list of tuples containing element details.
    prefix (str): A prefix for the dictionary keys.

    Returns:
    Dict[str, str]: A dictionary with element details expanded into key-value pairs.
    """

    data = {}
    for i, detail in enumerate(element_details):
        element, multiplicity, atomic_number = detail
        data[f'{prefix}_element_{i+1}'] = element
        data[f'{prefix}_multiplicity_{i+1}'] = multiplicity
        data[f'{prefix}_atomic_number_{i+1}'] = atomic_number
    return data

def extract_temperatures(operations):

    """
    Extracts sintering and calcination temperatures from a list of operations.

    Parameters:
    operations (List[Operation]): A list of operations, where each operation has a type and a token.

    Returns:
    Tuple[float, float]: A tuple containing sintering and calcination temperatures.
    """

    sintering_temp = None
    calcination_temp = None
    for operation in operations:
        if operation.type == 'HeatingOperation':
            if operation.token == 'sintered':
                sintering_temp = extract_temp(operation.conditions)
            elif operation.token == 'calcined':
                calcination_temp = extract_temp(operation.conditions)
    return sintering_temp, calcination_temp

def extract_temp(conditions):

    """
    Extracts the temperature from a set of conditions.

    Parameters:
    conditions (Conditions): Conditions containing heating temperature data.

    Returns:
    float: The first heating temperature value found, or None if not found.
    """
    
    if conditions.heating_temperature:
        for temp in conditions.heating_temperature:
            if temp.values:
                return temp.values[0]
    return None

In [None]:
# Convert to DataFrame
df = pd.DataFrame([{
    'doi': entry.doi,
    'paragraph_string': entry.paragraph_string,
    'synthesis_type': entry.synthesis_type,
    'reaction_string': entry.reaction_string,
    'targets_string': entry.targets_string,
    'sintering_temp': extract_temperatures(entry.operations)[0],
    'calcination_temp': extract_temperatures(entry.operations)[1]
} for entry in doiEntriesList])

# Apply the function to create new columns
df[['input_reaction', 'output_reaction']] = df['reaction_string'].apply(lambda x: pd.Series(split_reaction(x)))
df['input_elements'] = df['input_reaction'].apply(extract_element_details)
df['output_elements'] = df['output_reaction'].apply(extract_element_details)
input_expanded = df['input_elements'].apply(lambda x: pd.Series(expand_element_details(x, 'input')))
output_expanded = df['output_elements'].apply(lambda x: pd.Series(expand_element_details(x, 'output')))

# Concatenate the expanded details with the original DataFrame
df = pd.concat([df, input_expanded, output_expanded], axis=1)

# Drop temporary columns
df.drop(columns=['input_elements', 'output_elements'], inplace=True)

In [None]:
print(df.columns)

In [None]:
excel_path = './reaction_entries.xlsx'
df.to_excel(excel_path, index=False)

### Test code - de-duplication

In [None]:
payload = {}
for item, entries in duplicate_node_match.items():
    if len(entries) > 1:
        entrylist = []
        for entry in entries:
            entrylist.append(entry.to_dict())
        payload[entries[0].reaction_string] = entrylist

json_string = json.dumps(payload, indent=4,default=lambda o: o.__dict__)
filename = "output.json"

with open(filename, "w") as file:
    file.write(json_string)

print(f"JSON data saved to {filename}")

verbose_output = False 
duplicates = []
sum = 0
for key, entries in duplicate_node_match.items():
        if len(entries) > 1:
            for entry in entries:
                  duplicates.append({'reaction_string': entry.reaction_string,str(entry.doi): entry})
            # sum += len(entries)
            # if(verbose_output): print(len(entries), " Duplicates for: ", end='')
            # if(verbose_output): print(f"Right Side: {key[0]}", end='')
            # if(verbose_output): print("Target materials:", end='')
            # if(verbose_output): print(key[1][0], end='')
            # if(verbose_output): print("Precursor materials:", end='')
            # if(verbose_output): print(key[1][1], end='')
            # if(verbose_output): print()
            # for i in range(len(entries)):
            #     entry = entries[i]
            #     if(verbose_output): print("Entry #{}: ".format(i), end='')
            #     calc_operations = [op for op in entry.operations if "calc" in op.token]
            #     if(verbose_output): print("Reaction String {},".format(str(entry)))
            #     # for op in calc_operations:
            #     #     if(verbose_output): print(op)
            # print()