In [1]:
from chemdataextractor import Document
import re
import json
import csv
import pandas as pd
from gensim.models import Word2Vec
from mat2vec.processing import MaterialsTextProcessor
import numpy as np
import copy

# Auxiliary unction

In [2]:
model_path = 'mat2vec/training/models/pretrained_embeddings'
w2v_model = Word2Vec.load(model_path)

def calculate_phrase_vector(model, phrase):
    '''
    Calculate the vector representation of a given phrase.
    For each word in the phrase, if the word exists in the model, obtain its vector and calculate the mean of all these vectors.
    '''
    words = [model.wv[word] for word in phrase.split() if word in model.wv]
    if words:
        return np.mean(words, axis=0)
    return None

def calculate_similarity(element1, element2):
    '''
    Calculate and return the similarity between two elements.
    Output: The similarity score
    '''
    vector1 = calculate_phrase_vector(w2v_model, element1)
    vector2 = calculate_phrase_vector(w2v_model, element2)
    sim_score = 0
    if vector1 is not None and vector2 is not None:
        sim_score = np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))
    else:
        if element1 == element2:
            sim_score = 1

    return sim_score

def extract_chemical_info(text):
    '''
    Extracting NFA entities using ChemdataExtractor.
    '''
    doc = Document(text)
    materials = [c.text for c in doc.cems]
    abbreviations = {}
    for item in doc.abbreviation_definitions:
        abbr, definition = item[0], item[1] if len(item) >= 2 else ('', '')
        if isinstance(abbr, list):
            abbr = abbr[0] if abbr else ''
        if isinstance(definition, list):
            definition = ' '.join(definition)
        if abbr:
            abbreviations[abbr] = definition

    return list(set(materials)), abbreviations

def rule_based_extension(text, materials, special_chars=['–', ':', '/', '@']):
    '''
    Rule-base enhancement of ChemdataExtractor's ability to extract NFA entities
    '''
    extended_materials = []
    
    for material in materials:
        positions = [m.start() for m in re.finditer(re.escape(material), text)]
        for pos in positions:
            left_index = pos - 1
            right_index = pos + len(material)
            left_char = text[left_index] if left_index >= 0 else ''
            right_char = text[right_index] if right_index < len(text) else ''
            
            if left_char in special_chars or right_char in special_chars:
                extended_left = re.search(r'\s[^ ]*$', text[:left_index + 1])
                extended_right = re.search(r'^[^ ]*\s', text[right_index:])
                
                left_boundary = extended_left.start() + 1 if extended_left else left_index
                right_boundary = right_index + extended_right.end() - 1 if extended_right else right_index

                extended_material = text[left_boundary:right_boundary]
                extended_materials.append(extended_material)
            else:
                extended_materials.append(material)
    
    return list(set(extended_materials))

def correct_entities(entity, st_list):
    '''
    Compare the entity with all the elements in the stand list and delete the worng entity.
    Output: True if the entity is similar to any element in standard list else False
    '''
    sup_vocab = ['TiO2', 'CdS']
    for i in sup_vocab:
        if i in entity:
            return True
    if entity != '-':
        for i in range(len(st_list)):
            if calculate_similarity(entity, st_list[i]) >= 0.8:
                return True

    return False


# Sample:
# text_paragraph = "ethylenediamine (EDA). Organic/nanostructured Si hybrid solar cells have achieved high power conversion efficiency (PCE) and shortcircuit current density (JSC), due to the excellent light harvesting ability of nanostructured Si. However, the separation of carriers is mainly affected by the low work function of PH1000-type PEDOT:PSS, resulting in poor open circuit voltage (VOC). In this work, an efficient and stable MoO3-doped copper phthalocyanine-3,4 ',4 '',4 '''- tetra-sulfonated acid tetra sodium salt (TS-CuPc) organic small molecule film was introduced between the heterojunction interfaces. The photogenerated carrier separation is promoted by the enhanced built-in potential (V-bi) owing to the high work function of TS-CuPc:MoO3, which also suppresses the carrier recombination at the surface of nanostructured Si. As a result, for PEDOT:PSS/nanostructured Si photovoltaic devices, the PCE was greatly improved from 14.15% to 16.09%. The excellent charge separation properties and interface passivation effect enable efficient devices to demonstrate the vast potential of this new type of interlayer in photovoltaic applications."
# materials, abbreviations = extract_chemical_info(text_paragraph)
# materials = rule_based_extension(text_paragraph, materials)
# print(materials,'\n', abbreviations)

# NFA Label Standardization and Correct

In [3]:
def NFA_SC(file_path):
    res = []
    delete_res = []
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            entry = json.loads(line.strip())
            doi_part = entry['paper']
            abs_part = entry['instruction'].split('### Input:\n')[1].split('\n\n### Response:')[0].strip()
            response_part = entry['response'].replace('</s>', '').split('### Response:')[1].strip().split(' & ')
            temp = []
            for part in response_part:
                if part.strip():
                    try:
                        NERoutput = json.loads(part)
                        
                        #Extract chemical information and initialize the standard set for subsequent similarity comparison
                        materials, abbreviations = extract_chemical_info(abs_part)
                        materials = rule_based_extension(abs_part, materials)
                        Standard_set = set(materials)
                        Standard_set.update(abbreviations.keys())
                        Standard_set.update(abbreviations.values())
                        Standard_list = list(Standard_set) # Keep Chem elements single and accelerate efficiency

                        Name = NERoutput['Name'][0] if NERoutput['Name'] else '-'
                        Formula = NERoutput['Formula'][0] if NERoutput['Formula'] else '-'
                        Acronym = NERoutput['Acronym'][0] if NERoutput['Acronym'] else '-'

                        # SC1: Verify if Name, Formula, and Acronym are in the standard list. If not, set them to '-'.
                        if Name != '-':
                            if not correct_entities(Name, Standard_list):
                                Name = '-'
                                NERoutput['Name'].clear()

                        if Formula != '-':
                            if not correct_entities(Formula, Standard_list):
                                Formula = '-'
                                NERoutput['Formula'].clear()

                        if Acronym != '-':
                            if not correct_entities(Acronym, Standard_list):
                                Acronym = '-'
                                NERoutput['Acronym'].clear()

                        # SC2: Correct Acronym tags.
                        if Name == '-' and Formula == '-' and Acronym != '-':
                            if Acronym.isupper() or (Acronym.endswith('s') and Acronym[:-1].isupper()):
                                continue
                            else:
                                Name = Acronym
                                Acronym = '-'
                                NERoutput['Acronym'].clear()

                        if Name == '-' and Formula != '-' and Acronym == '-':
                            if Formula.isupper() or (Formula.endswith('s') and Formula[:-1].isupper()):
                                Acronym = Formula
                                Formula = '-'
                                NERoutput['Formula'].clear()

                        if Name != '-' and Formula == '-' and Acronym == '-':
                            if Name.isupper() or (Name.endswith('s') and Name[:-1].isupper()):
                                Acronym = Name
                                Name = '-'
                                NERoutput['Name'].clear()

                        # SC3: Ensure that Name and Acronym correspond.
                        if not abbreviations:
                            Acronym = '-'
                            NERoutput['Acronym'].clear()
                        elif Name != '-':
                            sim_Acronym = {key: 0 for key in abbreviations.keys()}
                            sim_Name = {value: 0 for value in abbreviations.values()}
                            for key in sim_Name.keys():
                                sim_Name[key] = calculate_similarity(Name, key)
                            max_sim_Name = max(sim_Name, key=sim_Name.get)
                            max_Name = sim_Name[max_sim_Name]
                            if max_Name > 0.85:
                                Name = max_sim_Name
                                if Formula != '-':
                                    sim_Formula = {key: 0 for key in abbreviations.keys()}
                                    for key in sim_Formula.keys():
                                        sim_Formula[key] = calculate_similarity(Formula, key)
                                    max_sim_Formula = max(sim_Formula, key=sim_Formula.get)
                                    max_Formula = sim_Formula[max_sim_Formula]
                                    if max_Formula > 0.85:
                                        Acronym = max_sim_Formula
                                        Formula = '-'
                                        NERoutput['Formula'].clear()
                                for key, value in abbreviations.items():
                                    if value == max_sim_Name:
                                        Acronym = key               
                        elif Acronym != '-':
                                sim_Acronym = {key: 0 for key in abbreviations.keys()}
                                for key in sim_Acronym.keys():
                                    sim_Acronym[key] = calculate_similarity(Acronym, key)
                                max_sim_Acronym = max(sim_Acronym, key=sim_Acronym.get)
                                max_Acronym = sim_Acronym[max_sim_Acronym]
                                if max_Acronym > 0.85:
                                    Acronym = max_sim_Acronym
                                    if Formula != '-':
                                        sim_Formula = {key: 0 for value in abbreviations.keys()}
                                        for key in sim_Formula.keys():
                                            sim_Formula[key] = calculate_similarity(Formula, key)
                                        max_sim_Formula = max(sim_Formula, key=sim_Formula.get)
                                        max_Formula = sim_Formula[max_sim_Formula]
                                        if max_Formula > 0.85:
                                            Name = max_sim_Formula
                                            Formula = '-'
                                            NERoutput['Formula'].clear()
                                    Name = abbreviations[max_sim_Acronym]
                                else:
                                    Name = '-'
                                    NERoutput['Name'].clear()
                                    Acronym = '-'
                                    NERoutput['Acronym'].clear()
                        elif Formula != '-':
                            sim_val = {value: 0 for value in abbreviations.values()}
                            for key in sim_val.keys():
                                sim_val[key] = calculate_similarity(Formula, key)
                            max_sim_val = max(sim_val, key=sim_val.get)
                            max_val = sim_val[max_sim_val]
                            if max_val > 0.85:
                                Name = max_sim_val
                                for key, value in abbreviations.items():
                                    if value == max_sim_val:
                                        Acronym = key
                                        Formula = '-'
                                        NERoutput['Formula'].clear()
                            if Formula != '-':
                                sim_key = {key: 0 for key in abbreviations.keys()}
                                for key in sim_key.keys():
                                    sim_key[key] = calculate_similarity(Formula, key)
                                max_sim_key = max(sim_key, key=sim_key.get)
                                max_key = sim_key[max_sim_key]
                                if max_key > 0.85:
                                    Acronym = max_sim_key
                                    Name = abbreviations[max_sim_key]
                                    Formula = '-'
                                    NERoutput['Formula'].clear()
                                
                        #  SC4: Handle empty abbreviations, after SC, if all these label is empty, the output is invalid.
                        if Name == '-' and Formula == '-' and Acronym == '-':
                            delete_res.append([doi_part])
                            continue
                        elif Name != '-' and Formula != '-':
                            if Name == Formula:
                                Formula = '-'
                                NERoutput['Formula'].clear()
                        elif Name != '-' and Acronym != '-':
                            if Name == Acronym:
                                Acronym = '-'
                                NERoutput['Acronym'].clear()
                        elif Formula != '-' and Acronym != '-':
                            if Formula == Acronym:
                                Acronym = '-'
                                NERoutput['Acronym'].clear()

                        if Name != '-':
                            NERoutput['Name'].clear()
                            NERoutput['Name'].append(Name)
                        if Formula != '-':
                            NERoutput['Formula'].clear()
                            NERoutput['Formula'].append(Formula)
                        if Acronym != '-':
                            NERoutput['Acronym'].clear()
                            NERoutput['Acronym'].append(Acronym)

                        temp.append(NERoutput)
                    except:
                        continue
            temp.append(doi_part)
            res.append(temp)
    return res

NERoutput_NFASC = NFA_SC('./NC/NC_result.jsonl') # Load your inference result here
NERoutput_NFASC

[[{'Name': [],
   'Formula': ['BaCe1 − xNdxO3 − x 2'],
   'Acronym': [],
   'Descriptor': ['polymetric'],
   'Structure/Phase': ['perovskites'],
   'Application': [{'neodymium doping': []}],
   'Property': ["'resolution': [1000]",
    "'spontaneous strain': [0.05]",
    "'opaque': []",
    "'transparent': []",
    "'reflectance': [1000]"],
   'Synthesis': [],
   'Characterization': ['neutron powder diffraction']},
  ''],
 [{'Name': ['NaFeTi2F11'],
   'Formula': [],
   'Acronym': [],
   'Descriptor': ['cathode'],
   'Structure/Phase': [],
   'Application': [{'sodium-based batteries': ["'intercalation voltage': [4.0] (property)"]}],
   'Property': [],
   'Synthesis': [],
   'Characterization': []},
  ''],
 [''],
 [''],
 [''],
 [{'Name': ['Battery manganese dioxide'],
   'Formula': [],
   'Acronym': [],
   'Descriptor': ['electrochemically deposited', 'doped'],
   'Structure/Phase': [],
   'Application': [{'batteries': []}],
   'Property': ["'etymology': []"],
   'Synthesis': ['electroche

# Other Label Standardization and Correct

In [4]:
NERoutput_OTHSC = copy.deepcopy(NERoutput_NFASC)

def OTHER_SC(abbr_path, keyword_path, NERoutput):
    abbr_df = pd.read_excel(abbr_path, header=None)
    keywords_df = pd.read_excel(keyword_path, header=None)
    
    name2abbr = pd.Series(abbr_df[1].values, index=abbr_df[0]).to_dict()
    keywords = list(keywords_df.itertuples(index=False, name=None))
    
    for entries in NERoutput:
        for entry in entries[:-1]:
            if isinstance(entry, dict):
                for key in list(entry.keys()):
                    if key in ['Structure/Phase', 'Synthesis', 'Characterization']:
                        for i, entity in enumerate(entry[key]):
                            max_similarity = 0
                            selected_keyword = None
                            for keyword_tuple in keywords:
                                keyword, threshold, abbr_condition, category = keyword_tuple
                                similarity = calculate_similarity(entity, keyword)
                                if similarity > max_similarity:
                                    max_similarity = similarity
                                    if similarity > threshold / 100:
                                        selected_keyword = keyword_tuple
                            if selected_keyword:
                                keyword, _, abbr_condition, category = selected_keyword
                                if abbr_condition == 'a':
                                    new_entity = next((k for k, v in name2abbr.items() if v == keyword), keyword)
                                else:
                                    new_entity = keyword
                                if category == key:
                                    entry[key][i] = new_entity  # Replace the old entity with the new one
                                else:
                                    # Remove the old entity from its current place
                                    if entity in entry[key]:
                                        entry[key].remove(entity)
                                    # Add the new entity to its correct category, handling 'Application' uniquely
                                    if category not in entry:
                                        entry[category] = [{}] if category == 'Application' else []
                                    if category == 'Application':
                                        entry[category][0][new_entity] = []
                                    else:
                                        if new_entity not in entry[category]:
                                            entry[category].append(new_entity)
                            else:
                                if entity in entry[key]:
                                    entry[key].remove(entity)
                                
                    elif key == 'Application':
                        for app_dict in entry['Application']:
                            app_keys_to_delete = []
                            for app_key in list(app_dict.keys()):
                                max_similarity = 0
                                selected_keyword = None
                                for keyword_tuple in keywords:
                                    keyword, threshold, abbr_condition, category = keyword_tuple
                                    similarity = calculate_similarity(app_key, keyword)
                                    if similarity > max_similarity:
                                        max_similarity = similarity
                                        if similarity > threshold / 100:
                                            selected_keyword = keyword_tuple
                                if selected_keyword:
                                    keyword, _, abbr_condition, category = selected_keyword
                                    if abbr_condition == 'a':
                                        new_app_key = next((k for k, v in name2abbr.items() if v == keyword), keyword)
                                    else:
                                        new_app_key = keyword
                                    if category != 'Application':
                                        app_value = app_dict.pop(app_key)
                                        if category not in entry:
                                            entry[category] = []
                                        if new_app_key not in entry[category]:
                                            entry[category].append(new_app_key)
                                    else:
                                        if new_app_key != app_key:
                                            app_dict[new_app_key] = app_dict.pop(app_key)
                                else:
                                    app_keys_to_delete.append(app_key)
                            for key_to_delete in app_keys_to_delete:
                                del app_dict[key_to_delete]
    return NERoutput

def process_property(file_path, NERoutput):
    keywords_df = pd.read_excel(file_path, header=None)
    keywords = [(row[0], row[1]) for index, row in keywords_df.iterrows()]

    for entry in NERoutput:
        if entry:
            for dict_entry in entry[:-1]:
                if 'Application' in dict_entry:
                    for app_dict in dict_entry['Application']:
                        for key, values in app_dict.items():
                            new_values = []
                            for line in values:
                                property_name, new_line = extract_and_replace_property(line, keywords)
                                if new_line:
                                    new_values.append(new_line)
                            app_dict[key] = new_values
                if 'Property' in dict_entry:
                    new_properties = []
                    for line in dict_entry['Property']:
                        property_name, new_line = extract_and_replace_property(line, keywords)
                        if new_line:
                            new_properties.append(new_line)
                    dict_entry['Property'] = new_properties


def extract_and_replace_property(line, keywords):
    try:
        if line.strip().startswith("'") and "':" in line:
            property_name = line.strip().split("'")[1]
            max_similarity = 0
            replacement_keyword = None
            max_threshold = 0
    
            for keyword, threshold in keywords:
                similarity = calculate_similarity(property_name, keyword)
                if similarity > max_similarity:
                    max_similarity = similarity
                    replacement_keyword = keyword
                    max_threshold = threshold
            
            if max_threshold and max_similarity > max_threshold / 100:
                new_line = line.replace(f"'{property_name}'", f"'{replacement_keyword}'")
                return property_name, new_line
            else:
                return property_name, line
    except:
        print(line)
    return None, None

NERoutput_OTHSC = OTHER_SC('./dictionary/abbreviation.xlsx', './dictionary/keywords.xlsx', NERoutput_OTHSC)
process_property('./dictionary/property.xlsx', NERoutput_OTHSC)
NERoutput_OTHSC

[[{'Name': [],
   'Formula': ['BaCe1 − xNdxO3 − x 2'],
   'Acronym': [],
   'Descriptor': ['polymetric'],
   'Structure/Phase': ['perovskites'],
   'Application': [{}],
   'Property': ["'resolution': [1000]",
    "'spontaneous strain': [0.05]",
    "'opaque': []",
    "'transparent': []",
    "'reflectance': [1000]"],
   'Synthesis': [],
   'Characterization': []},
  ''],
 [{'Name': ['NaFeTi2F11'],
   'Formula': [],
   'Acronym': [],
   'Descriptor': ['cathode'],
   'Structure/Phase': [],
   'Application': [{}],
   'Property': [],
   'Synthesis': [],
   'Characterization': []},
  ''],
 [''],
 [''],
 [''],
 [{'Name': ['Battery manganese dioxide'],
   'Formula': [],
   'Acronym': [],
   'Descriptor': ['electrochemically deposited', 'doped'],
   'Structure/Phase': [],
   'Application': [{}],
   'Property': ["'etymology': []"],
   'Synthesis': ['electrochemical deposition '],
   'Characterization': []},
  ''],
 [{'Name': [],
   'Formula': ['La1-xCexCoO3'],
   'Acronym': [],
   'Descriptor'

# Binary Classification

In [23]:
NERoutput_FT = copy.deepcopy(NERoutput_OTHSC)

def NFA_FT(ner_output):
    transformed_data = []
    
    for item in ner_output:
        if isinstance(item[-1], str):
            doi = item[-1]
            for element in item[:-1]:
                if 'Name' in element and element['Name']:
                    for name in element['Name']:
                        transformed_data.append({
                            "resource": doi,
                            "instruction": "Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.\n\n### Instruction:\nTell me if the given material/chemical term belongs to the material/chemical Name or Formula.\n\n### Input:\n" + name + "\n\n### Response:",
                        })
                if 'Formula' in element and element['Formula']:
                    for formula in element['Formula']:
                        transformed_data.append({
                            "resource": doi,
                            "instruction": "Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.\n\n### Instruction:\nTell me if the given material/chemical term belongs to the material/chemical Name or Formula.\n\n### Input:\n" + formula + "\n\n### Response:",
                        })

    return transformed_data

# Apply the transformation to the NER output
NFA_FT = NFA_FT(NERoutput_FT)

# Save the transformed data to a JSON file
output_file_path = './NFA_FT/classification7.json'
with open(output_file_path, 'w') as f:
    for item in NFA_FT:
        f.write(json.dumps(item) + "\n")

# Here you need to process the 'classification7.json' through LLMs

In [26]:
responses_file_path = './NFA_FT/processed_NFA7.json' # Load the classified result here.
responses_data = []

with open(responses_file_path, 'r') as file:
    for line in file:
        response = json.loads(line)
        responses_data.append(response)

NFAFT_dict = {}
        
def update_nfaft_from_response(response):
    input_start = response["response"].find("### Input:\n") + len("### Input:\n")
    input_end = response["response"].find("\n\n### Response:")
    input_term = response["response"][input_start:input_end].strip()
    
    response_start = response["response"].find("### Response:") + len("### Response:")
    response_end = response["response"].find("</s>")
    extracted_response = response["response"][response_start:response_end].strip()
    
    NFAFT_dict[input_term] = extracted_response

for response in responses_data:
    update_nfaft_from_response(response)

def check_and_replace(ner_output, NFAFT_dict):
    for item in ner_output:
        for element in item[:-1]:
            if isinstance(element, dict):
                for key in ['Name', 'Formula']:
                    if key in element and element[key]:
                        values_to_check = element[key]
                        for val in list(values_to_check):
                            if val in NFAFT_dict:
                                correct_key = NFAFT_dict[val]
                                if correct_key != key:
                                    opposite_key = 'Formula' if key == 'Name' else 'Name'
                                    if not element.get(opposite_key):
                                        element[opposite_key] = []
                                    element[opposite_key].append(val)
                                    element[key].remove(val)

check_and_replace(NERoutput_FT, NFAFT_dict)

# Save SCNERoutput

In [17]:
def save_to_json(data, file_name):
    with open(file_name, 'w', encoding='utf-8') as file:
        json.dump(data, file, indent=4)

save_to_json(NERoutput_FT, 'ER_RES.json')