# information extractor


In [23]:
from langchain_openai import ChatOpenAI
from typing import Optional, List
from langchain_core.pydantic_v1 import BaseModel, Field
import os
import chromadb
from langchain.retrievers import (
    ContextualCompressionRetriever,
    MergerRetriever,
)
from langchain_chroma import Chroma
from langchain_community.document_transformers import (
    EmbeddingsRedundantFilter,
    LongContextReorder
)
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import uuid
from typing import List, TypedDict
from langchain_core.messages import (
    AIMessage,
    BaseMessage,
    HumanMessage,
    SystemMessage,
    ToolMessage,
)
from langchain_community.document_loaders import PyPDFLoader
from langchain_experimental.text_splitter import SemanticChunker
from langchain.retrievers.document_compressors import EmbeddingsFilter, DocumentCompressorPipeline

In [24]:
with open(".env") as f:
    for line in f:
        key, value = line.strip().split("=")
        os.environ[key] = value
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")


In [25]:
llm = ChatOpenAI(
    model_name="gpt-4-turbo",
    temperature=0,
    # max_tokens=2000,
    api_key = OPENAI_API_KEY
)

# The prompt to enable few-shot in context learning to extract information

In [26]:
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are an expert extraction algorithm. "
            "Only extract relevant information from the text. "
            "If you do not know the value of an attribute asked "
            "to extract, return null for the attribute's value.",
        ),
        # ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
        MessagesPlaceholder("examples"),  # <-- EXAMPLES!
        # ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
        ("human", "{text}"),
    ]
)

In [27]:
class Example(TypedDict):
    """A representation of an example consisting of text input and expected tool calls.

    For extraction, the tool calls are represented as instances of pydantic model.
    """

    input: str  # This is the example text
    tool_calls: List[BaseModel]  # Instances of pydantic model that should be extracted


def tool_example_to_messages(example: Example) -> List[BaseMessage]:
    """Convert an example into a list of messages that can be fed into an LLM.

    This code is an adapter that converts our example to a list of messages
    that can be fed into a chat model.

    The list of messages per example corresponds to:

    1) HumanMessage: contains the content from which content should be extracted.
    2) AIMessage: contains the extracted information from the model
    3) ToolMessage: contains confirmation to the model that the model requested a tool correctly.

    The ToolMessage is required because some of the chat models are hyper-optimized for agents
    rather than for an extraction use case.
    """
    messages: List[BaseMessage] = [HumanMessage(content=example["input"])]
    openai_tool_calls = []
    for tool_call in example["tool_calls"]:
        openai_tool_calls.append(
            {
                "id": str(uuid.uuid4()),
                "type": "function",
                "function": {
                    # The name of the function right now corresponds
                    # to the name of the pydantic model
                    # This is implicit in the API right now,
                    # and will be improved over time.
                    "name": tool_call.__class__.__name__,
                    "arguments": tool_call.json(),
                },
            }
        )
    messages.append(
        AIMessage(content="", additional_kwargs={"tool_calls": openai_tool_calls})
    )
    tool_outputs = example.get("tool_outputs") or [
        "You have correctly called this tool."
    ] * len(openai_tool_calls)
    for output, tool_call in zip(tool_outputs, openai_tool_calls):
        messages.append(ToolMessage(content=output, tool_call_id=tool_call["id"]))
    return messages

# Defined schema of CE and examples

In [28]:
# pydantic style of Electrolyte schema
class Electrolyte(BaseModel):
    '''An electrolyte is a medium containing ions that are electrically conductive through the movement of those ions includes solubal salts.'''
    solvent: Optional[List[str]] = Field(..., description="The solvent in which the compound is dissolved.")
    salt: Optional[List[str]] = Field(..., description="The salt that is dissolved in the solvent. Usually, the salt is a lithium salt.")

class CE(BaseModel):
    '''the charge efficiency by which electrons are transferred in batteries'''
    ce: Optional[float] = Field(..., description="The Coulombic efficiency in percentage.")
    electrolyte: Optional[Electrolyte] = Field(..., description="The electrolyte used in the battery.")

class Data(BaseModel):
    '''extract data about CE'''
    data: List[CE]

examples = [
    (
        r'''The 1M LiPF6 EC:DEC 1:2, EC:DEC 1:1, FEC:DEC 1:1, and
FEC:TFEC 1:1 electrolytes were also screened in Cu||Li coin cells,
with the lithium metal cycling efficiencies for each summarized in
Figure 7b. Since the pseudopotentiostat used to test the combi cell
does not operate galvanostatically, it is difficult to precisely control the
plating/stripping current and capacity. The average plating capacity for
the combi cell tests was approximately 1 mAh cm−2 and the average
plating current was approximately 1 mA cm−2 . In an effort to mimic
these conditions for the Cu||Li coin cell tests, 1 mAh cm−2 of lithium
was plated and stripped at a constant current of 1 mA cm−2 . The same
general trend shown in the combi cell is also demonstrated by the coin
cell results. The control type electrolytes (EC:DEC 1:2 and 1:1) show
low and very unstable cycling efficiency. The CE of the EC:DEC 1:2
electrolyte declines and becomes unstable after only 10 cycles, while
the EC:DEC 1:1 does the same after about 25 cycles. The average CE
for these electrolytes over 40 cycles was only 83.7% and 90.3% for
the 1:2 and 1:1 EC:DEC respectively.''',
        Data(data=[CE(ce=83.7, electrolyte=Electrolyte(solvent=["EC", "DEC"], salt=["LiPF6"])), CE(ce=90.3, electrolyte=Electrolyte(solvent=["EC", "DEC"], salt=["LiPF6"]))]),
    ),
    (
        r"As shown in Fig. S8,† the coulombic efficiency of the LiTFPFB based cell is 80.6% after 50 cycles, which is higher than that of the LiBF4 based one (60.3%), indicating improved stability of the LiTFPFB based electrolyte. ",
        Data(data=[CE(ce=80.6, electrolyte=Electrolyte(solvent=[], salt=["LiTFPFB"])), CE(ce=60.3, electrolyte=Electrolyte(solvent=[], salt=["LiBF4"]))]),
    )
]

messages = []
for text, tool_call in examples:
    messages.extend(tool_example_to_messages({"input": text, "tool_calls": [tool_call]}))

# Defined Hard-schema, which tries to extract more information

In [29]:
# pydantic style of Electrolyte schema
class Solvent(BaseModel):
    solvent: Optional[str] = Field(..., description="The solvent in which the compound is dissolved.")
    volume_percent: Optional[float] = Field(..., description="The volume percentage of the solvent in the electrolyte. The sum of volume_percents across the solvents should be around 1.")
    # mol_L: Optional[float] = Field(..., description="The molarity of the solvent in the electrolyte.")

class Salt(BaseModel):
    salt: Optional[str] = Field(..., description="The salt that is dissolved in the solvent. Usually, the salt is a lithium salt.")
    # volume_percent: Optional[float] = Field(..., description="The volume percentage of the salt in the electrolyte. The sum of volume_percents across the salts should be around 1.")
    mol_L: Optional[float] = Field(..., description="The molarity of the salt in the electrolyte. Usually this is mentioned in the form of 'x M', e.g., '1 M'.")

class Electrolyte_H(BaseModel):
    '''An electrolyte is a medium containing ions that are electrically conductive through the movement of those ions includes solubal salts.'''
    solvent_list: Optional[List[Solvent]] = Field(..., description="List of the solvents.")
    salt_list: Optional[List[Salt]] = Field(..., description="List of the salts.")

class CE_H(BaseModel):
    '''the charge efficiency by which electrons are transferred in batteries'''
    ce: Optional[float] = Field(..., description="The Coulombic efficiency in percentage.")
    electrolyte: Optional[Electrolyte_H] = Field(..., description="The electrolyte used in the battery.")
    # temperature: Optional[float] = Field(..., description="The temperature at which the battery was tested.")

class Data_H(BaseModel):
    '''extract data about CE, H stands for hard schema, which includes the volume of the solvent/salt in the electrolyte.'''
    data: List[CE_H]

examples_hard = [
    (
        r'''The 1M LiPF6 EC:DEC 1:2, EC:DEC 1:1, FEC:DEC 1:1, and
FEC:TFEC 1:1 electrolytes were also screened in Cu||Li coin cells,
with the lithium metal cycling efficiencies for each summarized in
Figure 7b. Since the pseudopotentiostat used to test the combi cell
does not operate galvanostatically, it is difficult to precisely control the
plating/stripping current and capacity. The average plating capacity for
the combi cell tests was approximately 1 mAh cm−2 and the average
plating current was approximately 1 mA cm−2 . In an effort to mimic
these conditions for the Cu||Li coin cell tests, 1 mAh cm−2 of lithium
was plated and stripped at a constant current of 1 mA cm−2 . The same
general trend shown in the combi cell is also demonstrated by the coin
cell results. The control type electrolytes (EC:DEC 1:2 and 1:1) show
low and very unstable cycling efficiency. The CE of the EC:DEC 1:2
electrolyte declines and becomes unstable after only 10 cycles, while
the EC:DEC 1:1 does the same after about 25 cycles. The average CE
for these electrolytes over 40 cycles was only 83.7% and 90.3% for
the 1:2 and 1:1 EC:DEC respectively.''',
        # Data(data=[CE(ce=83.7, electrolyte=Electrolyte(solvent_list=["EC", "DEC"], salt=["LiPF6"])), CE(ce=90.3, electrolyte=Electrolyte(solvent=["EC", "DEC"], salt=["LiPF6"]))]),
        Data_H(
            data=[
                    CE_H(ce=83.7, electrolyte=Electrolyte_H(solvent_list=[Solvent(solvent="EC", volume_percent=0.33), Solvent(solvent="DEC", volume_percent=0.67)], salt_list=[Salt(salt="LiPF6",mol_L= 1)])), 
                    CE_H(ce=90.3, electrolyte=Electrolyte_H(solvent_list=[Solvent(solvent="EC", volume_percent=0.5), Solvent(solvent="DEC", volume_percent=0.5)], salt_list=[Salt(salt="LiPF6", mol_L= 1)])),
                ]
        ),
    ),
    (
        r"As shown in Fig. S8,† the coulombic efficiency of the LiTFPFB based cell is 80.6% after 50 cycles, which is higher than that of the LiBF4 based one (60.3%), indicating improved stability of the LiTFPFB based electrolyte. ",
        # Data(data=[CE(ce=80.6, electrolyte=Electrolyte(solvent=[], salt=["LiTFPFB"])), CE(ce=60.3, electrolyte=Electrolyte(solvent=[], salt=["LiBF4"]))]),
        Data_H(
            data=[
                CE_H(ce=80.6, electrolyte=Electrolyte_H(solvent_list=[], salt_list = [Salt(salt="LiTFPFB", mol_L=None)])), 
                CE_H(ce=60.3, electrolyte=Electrolyte_H(solvent_list=[], salt_list = [Salt(salt="LiBF4", mol_L=None)])),
            ]
        ),
    ),
    (
        r'''1 M LiTFSI EC-DMC (1:1 v) 5wt% FEC''',
        Data_H(
            data=[
                CE_H(ce=None, electrolyte=Electrolyte_H(solvent_list=[Solvent(solvent="EC", volume_percent=0.5), Solvent(solvent="DMC", volume_percent=0.5), Solvent(solvent="FEC", volume_percent=0.05)],
                salt_list=[Salt(salt="LiTFSI", mol_L=1)])) 
            ]
        )
    ),
    
]

messages_hard = []
for text, tool_call in examples_hard:
    messages_hard.extend(tool_example_to_messages({"input": text, "tool_calls": [tool_call]}))

# define the pipeline of Information Extraction

In [30]:
runnable = prompt | llm.with_structured_output(
    schema=Data,
    method="function_calling",
    include_raw=False,
)

runnable_hard = prompt | llm.with_structured_output(
    schema=Data_H,
    method="function_calling",
    include_raw=False,
)

In [38]:
# Get 3 diff embeddings.
model_kwargs = {'device': 'cuda:0'}  # specify GPU device
encode_kwargs = {'normalize_embeddings': True}
material_embedding = HuggingFaceEmbeddings(model_name="pranav-s/MaterialsBERT", model_kwargs=model_kwargs, encode_kwargs=encode_kwargs)
bio_embedding = HuggingFaceEmbeddings(model_name="dmis-lab/biobert-v1.1", model_kwargs=model_kwargs, encode_kwargs=encode_kwargs)
filter_embeddings = OpenAIEmbeddings()


  from .autonotebook import tqdm as notebook_tqdm
No sentence-transformers model found with name pranav-s/MaterialsBERT. Creating a new one with MEAN pooling.
Some weights of BertModel were not initialized from the model checkpoint at pranav-s/MaterialsBERT and are newly initialized: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
No sentence-transformers model found with name dmis-lab/biobert-v1.1. Creating a new one with MEAN pooling.


# wrapped code into methods

In [35]:
from langchain_text_splitters import TokenTextSplitter
def parse_pdf(path, filtering_references = True):
    text_splitter = SemanticChunker(
    material_embedding, breakpoint_threshold_type="percentile", number_of_chunks=5
    )
    loader = PyPDFLoader(path)
    documents = loader.load()
    end = len(documents) - 1
    # find the last page that contains the keyword "references" or "bibliography"
    for i in range(len(documents)-1, -1, -1):
        if "references" in documents[i].page_content.lower() or "bibliography" in documents[i].page_content.lower():
            end = i
            break
    if filtering_references:
        documents = documents[:(end+1)]
    try:
        splitted_documents = text_splitter.split_documents(documents)
    except ZeroDivisionError:
        # use another splitter
        text_splitter = TokenTextSplitter(chunk_size=50, chunk_overlap=5)
        splitted_documents = text_splitter.split_documents(documents)
    return splitted_documents

def inject_lotr(documents):
    print("chunked documents: ", len(documents))
    client_settings = chromadb.config.Settings(
    is_persistent=False,
    # persist_directory=DB_DIR,
    anonymized_telemetry=False,
    )
    db_material = Chroma(
    collection_name="project_store_material",
    # persist_directory=DB_DIR,
    client_settings=client_settings,
    embedding_function=material_embedding,
    )
    db_bio = Chroma(
        collection_name="project_store_bio",
        # persist_directory=DB_DIR,
        client_settings=client_settings,
        embedding_function=bio_embedding,
    )
    db_openai = Chroma(
        collection_name="project_store_openai",
        # persist_directory=DB_DIR,
        client_settings=client_settings,
        embedding_function=filter_embeddings,
    )
    ids_material = db_material.get()['ids']
    ids_bio = db_bio.get()['ids']
    ids_openai = db_openai.get()['ids']
    if ids_material:
        db_material.delete(ids_material)
    if ids_bio:
        db_bio.delete(ids_bio)
    if ids_openai:
        db_openai.delete(ids_openai)
    # Define 2 diff retrievers with 2 diff embeddings and diff search type.
    retriever_all = db_material.as_retriever(
        search_type="similarity", search_kwargs={"k": 4}
    )
    retriever_multi_qa = db_bio.as_retriever(
        search_type="similarity", search_kwargs={"k": 4}
    )
    retriever_openai = db_openai.as_retriever(
        search_type="similarity", search_kwargs={"k": 4}
    )
    # The Lord of the Retrievers will hold the output of both retrievers and can be used as any other
    # retriever on different types of chains.
    lotr = MergerRetriever(retrievers=[retriever_all, retriever_multi_qa, retriever_openai])
    db_material.add_documents(documents)
    db_bio.add_documents(documents)
    db_openai.add_documents(documents)
    return lotr
    # redundent_filter = EmbeddingsRedundantFilter(embeddings=filter_embeddings)
    # reordering = LongContextReorder()
    # pipeline = DocumentCompressorPipeline(transformers=[redundent_filter, reordering])

    # return ContextualCompressionRetriever(
    #     base_compressor=pipeline, base_retriever=lotr
    # )

def extract_info(retriever, query_list, query_compress,hard_schema=False):
    doc_list = []
    for query in query_list:
        k = 4
        for i in range(3):
            try:
                for r in retriever.retrievers:
                    r.search_kwargs["k"] = k
                docs = retriever.invoke(query)
                doc_list.extend(docs)
                break
            except RuntimeError as e:
                # print(e)
                k /= 2
                k = int(k)
                print(e, "retrying with k = ", k)
    redundent_filter = EmbeddingsRedundantFilter(embeddings=filter_embeddings)
    reordering = LongContextReorder()
    pipeline = DocumentCompressorPipeline(transformers=[redundent_filter, reordering])
    doc_list_compressed = pipeline.compress_documents(doc_list, query=query_compress)
    context = "\n".join([r.page_content for r in doc_list_compressed])
    if not hard_schema:
        print("extracting with easy mode")
        generation = runnable.invoke({"text": context, "examples": messages})
    else:
        print("extracting with hard mode")
        generation = runnable_hard.invoke({"text": context, "examples": messages_hard})
    return generation

def extract_info_from_pdf(path, query_list, query_compress, hard_schema=False):
    # chunking
    documents = parse_pdf(path)
    # injecting
    retriever = inject_lotr(documents)
    # RAG
    return extract_info(retriever, query_list, query_compress ,hard_schema=hard_schema)

queries = ['What is the electrolyte?', 'What solvents are included in the electrolyte(s)?', 'What is the Coulombic efficiency(CE)?', 'Under what conditions (e.g., temperature, voltage) was the electrolyte tested?']


# Information Extraction

In [38]:
# read all pdf in the folder "papers" and extract the information
import os
import pickle
save_path = "database/extracte_info"
files = os.listdir("papers")
for file in files:
    if file.endswith(".pdf"):
        print(file)
        filename = file.split(".")[0]
        if os.path.exists(f"{save_path}/{filename}_hard.pkl"):
            continue
        response_H = extract_info_from_pdf("database/papers/" + file, queries,"What information is related to Coulombic efficiency(CE)?", hard_schema=True)
        with open(f"{save_path}/{filename}_hard.pkl", "wb") as f:
            pickle.dump(response_H.dict(), f)
        print(response_H.dict())

1.pdf
10.pdf
11.pdf
12.pdf
13.pdf
14.pdf
15.pdf
16.pdf
17.pdf
18.pdf
19.pdf
2.pdf
20.pdf
21.pdf
22.pdf
23.pdf
24.pdf
25.pdf
26.pdf
27.pdf
28.pdf
29.pdf
chunked documents:  10
extracting with hard mode
{'data': []}
3.pdf
chunked documents:  36
extracting with hard mode
{'data': [{'ce': 99.98, 'electrolyte': {'solvent_list': [{'solvent': 'FDMB', 'volume_percent': 1.0}], 'salt_list': [{'salt': 'LiFSI', 'mol_L': 1.0}]}}]}
30.pdf
chunked documents:  55
extracting with hard mode
{'data': [{'ce': 94.28, 'electrolyte': {'solvent_list': [{'solvent': 'EC', 'volume_percent': 0.5}, {'solvent': 'DEC', 'volume_percent': 0.5}], 'salt_list': [{'salt': 'LiFSI', 'mol_L': 1.0}]}}, {'ce': 99.5, 'electrolyte': {'solvent_list': [{'solvent': 'DEE', 'volume_percent': 0.2}, {'solvent': 'BTFE', 'volume_percent': 0.8}], 'salt_list': [{'salt': 'LiFSI', 'mol_L': 1.8}]}}]}
31.pdf
chunked documents:  45
extracting with hard mode
{'data': [{'ce': 98.1, 'electrolyte': {'solvent_list': [{'solvent': 'DME', 'volume_perce