In [1]:
from threading import Lock
from typing import Optional, Literal

import dspy
from dspy.utils.callback import BaseCallback
from pydantic import BaseModel, Field, field_validator


class SwappedChatAdapter(dspy.adapters.ChatAdapter):
    def format(self, signature, demos, inputs):
        messages:list = super().format(signature, demos, inputs)
        systeme_message = messages.pop(0)
        messages.insert(-1, systeme_message)
        return messages

lm = dspy.LM('openai/gpt-4o', cache=False)
dspy.configure(lm=lm, adapter=SwappedChatAdapter())

# first exclude non-solid-state reactions
# then ask it to resolve abbreviated or un-identified chmeical formula using context.
# discard any reactions which are not completed or not chemical reaction, exclude any in-valid entries

class Target(BaseModel):
    target_formula: str = Field(description='strictly agree with IUPAC naming convention')
    amount_var: dict[str, list[float]] = Field(description='the amount variable in the chemical formula, e.g. AxBC, {x: [1, 2]}')

class Reaction(BaseModel):
    precursors: list[str] = Field(description='strictly agree with IUPAC naming convention')
    additives: list[str]
    target: Target
    reaction_type: Literal['solid-state', 'sol-gel', 'co-precipitation', 'hydrothermal', 'flux', 'others'] = Field(description='the type of reaction, choose others if you are uncertain')

class QA(dspy.Signature):
    """Your objective is to extract all chemical reactions described in the text, regardless of prominence. Each reaction should include:
    1. Explicitly mentioned precursors and target products and do not infer any missing reactants or products.
    2. Reaction type, as classified into solid-state, sol-gel, hydrothermal, co-precipitation, flux, or others.

    Ensure reasoning includes all reactions described, mentioning each reaction process briefly.

    Examples are provided for reference."""
    text: str = dspy.InputField(desc='a piece of text which may contains chemical reactions')
    reactions: Optional[list[Reaction]] = dspy.OutputField(desc='the reactions extracted from the text, return null if no reaction found')

class UnidentifiedTerm(dspy.Signature):
    """You are an chemist expert. You objective is to check if there is any acronym or un-identified chemical elements such as R or M in the given chemical names"""
    chemical_names: list[str] = dspy.InputField(desc='chemical names separated by comma')
    have: bool = dspy.OutputField(desc='have acronym or un-identified chemical elements or none of them')
    unidentified_names: Optional[list[str]] = dspy.OutputField(desc='acronyms and un-identified chemicals, return null if have is False')

class ResolveTerms(dspy.Signature):
    """You are an chemist expert. You objective is to find refered chemicals with given context for the un-identified chemical names and use them to replace the un-identified names in reactions.
    If you can't find the refered chemicals, then discard relevant reactions."""
    context: str = dspy.InputField()
    unidentified_names: list[str] = dspy.InputField()
    reactions: list[Reaction] = dspy.InputField()
    reactions_with_sub: list[Reaction] = dspy.OutputField()

class ClearInvalidReactions(dspy.Signature):
    """You are an chemist expert. Your objective is to clear any invalid chmical reactions from the given list of reactions.
    the reactions are invalid if they are not completed or not chemical reaction or have any in-valid chemical formulas."""
    reactions: list[Reaction] = dspy.InputField()
    valid_reactions: list[Reaction] = dspy.OutputField()

class ExtractReaction(dspy.Module):
    def __init__(self):
        self.extractor = dspy.ChainOfThought(signature=QA)
        self.checker = dspy.ChainOfThought(signature=UnidentifiedTerm)
        self.resolver = dspy.ChainOfThought(signature=ResolveTerms)
        self.cleaner = dspy.ChainOfThought(signature=ClearInvalidReactions)
        
    def forward(self, text, context):
        extraction = self.extractor(text=text)
        reactions = extraction.reactions
        if not reactions:
            return
        solid_reactions = [reaction for reaction in reactions if reaction.reaction_type == 'solid-state']
        if not solid_reactions:
            return
        chemical_names = []
        for reaction in solid_reactions:
            chemical_names.extend(reaction.precursors)
            chemical_names.append(reaction.target.target_formula)
        chemical_names = list(set(chemical_names))
        checking = self.checker(chemical_names=chemical_names)
        if checking.have and checking.unidentified_names:
            resolving = self.resolver(context=context, unidentified_names=checking.unidentified_names, reactions=solid_reactions)
            solid_reactions = resolving.reactions_with_sub
        removed_incomplete = [reaction for reaction in solid_reactions if reaction.precursors]
        return self.cleaner(reactions=removed_incomplete).valid_reactions

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
compiled_program = ExtractReaction()
# compiled_program.load('compiled_direct_with_element_var_version_2')

In [3]:
demos = compiled_program.predictor.demos
keeped_demos = []
keeped_demos.append(demos[0])
keeped_demos.append(demos[2])
keeped_demos.append(demos[3])
keeped_demos.append(demos[-1])
compiled_program.predictor._predict.demos = keeped_demos
compiled_program.predictor.demos

[{'augmented': True,
  'text': '(1-x)(Ca0.88Sr0.12)TiO3–x(Bi0.5Na0.5)TiO3 [hereafter referred to as (1−x)CST–xBNT] ceramics (x\u2009=\u20090.025, 0.050, 0.075, 0.10) were prepared by the conventional solid-state reaction method. The starting materials were high-purity grade powders (>\u200999%): CaCO3, SrCO3, Bi2O3, Na2CO3 and TiO2. CST and BNT powders were synthesized separately by mixing the starting materials according to the desired stoichiometry, and milled with ZrO2 balls and deionized water for 6 h in nylon jars. After drying, the powders were calcined at 1100 °C for 3 h and 850 °C for 3 h, respectively. The resulting powders were re-milled for 6 h. After that, the dried and re-milled mixtures were granulated with appropriate poly vinyl alcohol (PVA) as binder and uniaxial pressed at 300 MPa into cylinders with approximate size of 15 mm in diameter and 7.3 mm in thickness. The cylinders with x\u2009=\u20090.05, 0.075 and 0.1 were sintered in air at 1225–1300 °C for 3 h, and the 

In [4]:
# since the compiled model signature not compatible with current version, we need to update the signature
compiled_program.predictor.extended_signature.instructions = """Your objective is to extract all chemical reactions described in the text, regardless of prominence. Each reaction should include:
1. Explicitly mentioned precursors and target products.
2. Reaction type, as classified into solid-state, sol-gel, hydrothermal, co-precipitation, flux, or others.

Ensure reasoning includes all reactions described, mentioning each reaction process briefly.

Examples are provided for reference."""

In [6]:
compiled_program.predictor.extended_signature

StringSignature(text -> reasoning, reactions
    instructions='Your objective is to extract all chemical reactions described in the text, regardless of prominence. Each reaction should include:\n1. Explicitly mentioned precursors and target products.\n2. Reaction type, as classified into solid-state, sol-gel, hydrothermal, co-precipitation, flux, or others.\n\nEnsure reasoning includes all reactions described, mentioning each reaction process briefly.\n\nExamples are provided for reference.'
    text = Field(annotation=str required=True json_schema_extra={'desc': 'a piece of text which may contains chemical reactions', '__dspy_field_type': 'input', 'prefix': 'Text:'})
    reasoning = Field(annotation=str required=True json_schema_extra={'prefix': "Reasoning: Let's think step by step in order to", 'desc': '${reasoning}', '__dspy_field_type': 'output'})
    reactions = Field(annotation=Union[list[Reaction], NoneType] required=True json_schema_extra={'desc': 'the reactions extracted from 

In [3]:
import re
from sisyphus.utils.tenacity_retry_utils import pydantic_validate_retry_wraps
from sisyphus.utils.helper_functions import return_valid


exp_section_pattern = re.compile(r'\b(?:experiment(?:al|s|ing|ed)?|synthesis(?:es|ing|ed)?|preparation(?:s|al|ed|ing)?|process(?:es|ion|ing)?|method(?:s)?)\b', re.I)
def filter_with_kw(doc):
    return bool(exp_section_pattern.search(doc.metadata['sub_titles']))

@return_valid
@pydantic_validate_retry_wraps
def extract(doc, extract_program):
    para = doc.page_content
    title = doc.metadata['title']
    abstract = doc.metadata['abstract']
    sub_titles = doc.metadata['sub_titles']
    text = f'title:\n{title}\n' + 'subtitles:\n' + '\n'.join([f'{sub_title}' for sub_title in sub_titles.split('/')]) + f'\nparagraph:\n{para}'
    sub_titles_formatted = "\n".join(sub_titles.split('/'))
    context = f"title: {title}\nabstract: {abstract}\n{sub_titles_formatted}\nparagraph: {para}"
    return extract_program(text=text, context=context)

In [10]:
ARTICLE = '40_with_good_title'
TARGET = 'inorganic_with_validation_test'

In [11]:
from functools import partial
from sisyphus.chain import Filter, Writer
from sisyphus.chain.customized_elements import customized_extractor, customized_validator
from sisyphus.utils.helper_functions import get_plain_articledb, get_create_resultdb
from sisyphus.chain.chain_elements import run_chains_with_extarction_history_multi_threads

article_db = get_plain_articledb(ARTICLE)
article_getter = Filter(article_db, filter_func=filter_with_kw, with_abstract=True)
result_db = get_create_resultdb(TARGET, Reaction)
compiled_extractor = partial(extract, extract_program=compiled_program)
compiled_extract_el = customized_extractor(compiled_extractor, 'thread', 5)
# validator = customized_validator(resolve, 'thread', 5)
# cleaner = customized_validator(discard, 'thread', 5)

In [12]:
compiled_extract_chain = article_getter + compiled_extract_el
chain = compiled_extract_chain + Writer(result_db)

In [13]:
CHAR_TO_HTML_LBS = {
    '/': '&sol;',
    '\\': '&bs;',
    '?': '&qm;',
    '*': '&st;',
    ':': '&cl;',
    '|': '&vb;',
    '<': '&lt;',
    '>': '&gt;',
    '"': '&quot;',
    '\'': '&apos;'
}
# convert the doi to escaped doi, and add .html to the end
def doi_to_escaped_doi(doi):
    escaped_doi = doi
    for char, html in CHAR_TO_HTML_LBS.items():
        escaped_doi = escaped_doi.replace(char, html)
    return escaped_doi + '.html'

In [17]:
result_db.clear_tables()

In [8]:
run_chains_with_extarction_history_multi_threads(chain, 'articles_processed', 5, TARGET)

100%|██████████| 40/40 [01:44<00:00,  2.62s/it]


In [19]:
import json
with open('with_validation_test.json', 'w', encoding='utf-8') as f:
    json.dump(result_db.load_as_json('4o', QA.__doc__, TARGET, True), f, ensure_ascii=False, indent=2)

In [18]:
lm.inspect_history(15)





[34m[2024-12-30T11:34:34.725754][0m

[31mSystem message:[0m

Your input fields are:
1. `chemical_names` (str): chemical names separated by comma

Your output fields are:
1. `reasoning` (str)
2. `have` (bool): have acronym or un-identified chemical elements or none of them
3. `unidentified_names` (Union[list[str], NoneType]): acronyms and un-identified chemicals, return null if have is False

All interactions will be structured in the following way, with the appropriate values filled in.

[[ ## chemical_names ## ]]
{chemical_names}

[[ ## reasoning ## ]]
{reasoning}

[[ ## have ## ]]
{have}        # note: the value you produce must be True or False

[[ ## unidentified_names ## ]]
{unidentified_names}        # note: the value you produce must be pareseable according to the following JSON schema: {"anyOf": [{"type": "array", "items": {"type": "string"}}, {"type": "null"}]}

[[ ## completed ## ]]

In adhering to this structure, your objective is: 
        You are an chemist expert.