In [1]:
! pip install -U langchain-nomic langchain_community tiktoken langchainhub chromadb langchain langgraph tavily-python gpt4all firecrawl-py



In [None]:
! pip install langchain_openai

In [None]:
! pip install gpt4all

In [None]:
# from langchain.text_splitter import RecursiveCharacterTextSplitter
# from langchain_community.vectorstores import Chroma
# from langchain_community.embeddings import GPT4AllEmbeddings
# from langchain_community.document_loaders import DirectoryLoader, PyMuPDFLoader
# from langchain_community.vectorstores.utils import filter_complex_metadata
# from langchain.docstore.document import Document


# source_folder = 'data'

# # Load all PDF files from the folder
# loader = DirectoryLoader(source_folder, glob="*.pdf", loader_cls=PyMuPDFLoader)
# docs = loader.load()

# # docs_list = [item for sublist in docs for item in sublist]

# text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
#     chunk_size=512, chunk_overlap=0
# )
# doc_splits = text_splitter.split_documents(docs)

# # Manually filter metadata
# filtered_docs = []
# for doc in doc_splits:
#     if isinstance(doc, Document) and hasattr(doc, 'metadata'):
#         clean_metadata = {
#             key: value for key, value in doc.metadata.items()
#             if isinstance(value, (str, int, float, bool))
#         }
#         filtered_docs.append(Document(page_content=doc.page_content, metadata=clean_metadata))

# vectorstore = Chroma.from_documents(
#     documents=filtered_docs,
#     collection_name="polaris-dummy003",
#     embedding=GPT4AllEmbeddings()
# )

# retriever = vectorstore.as_retriever()

In [2]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import GPT4AllEmbeddings
from langchain_community.document_loaders import FireCrawlLoader
from langchain_community.vectorstores.utils import filter_complex_metadata
from langchain.docstore.document import Document

# List of URLs to scrape
urls = [
    "https://www.ai-jason.com/learning-ai/how-to-reduce-llm-cost",
    "https://www.ai-jason.com/learning-ai/gpt5-llm",
    "https://www.ai-jason.com/learning-ai/how-to-build-ai-agent-tutorial-3",
]

# Scrape documents
docs_nested = [FireCrawlLoader(api_key="fc-6ebd7b8a09cc40299e4bdba88a5c15f7", url=url, mode="scrape").load() for url in urls]

# Flatten the list (Important: Fixes the AttributeError)
docs = [doc for sublist in docs_nested for doc in sublist]  # Flatten list of lists

# Text Splitter
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=512, chunk_overlap=0
)

# Split documents
doc_splits = text_splitter.split_documents(docs)

# Manually filter metadata
filtered_docs = []
for doc in doc_splits:
    if isinstance(doc, Document) and hasattr(doc, 'metadata'):
        clean_metadata = {
            key: value for key, value in doc.metadata.items()
            if isinstance(value, (str, int, float, bool))
        }
        filtered_docs.append(Document(page_content=doc.page_content, metadata=clean_metadata))

# Create Chroma vector store
vectorstore = Chroma.from_documents(
    documents=filtered_docs,
    collection_name="DummyDB012",
    embedding=GPT4AllEmbeddings()
)

# Create retriever
retriever = vectorstore.as_retriever()

In [3]:
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv, find_dotenv
import os

_ = load_dotenv(find_dotenv()) # read local .env file
key = os.environ['OPENAI_API_KEY']

# Using OpenAI's GPT model
llm = ChatOpenAI(
    model="gpt-4-turbo",
    temperature=0,
    openai_api_key= key 
)

In [4]:
# Retrieval Grader

from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser

prompt = PromptTemplate(
    template="""
    You are a grader assessing the relevance of a retrieved document to a user question. 
    If the document contains keywords related to the user question, grade it as relevant. 
    This does not need to be a stringent test; the goal is to filter out erroneous retrievals. 
    Give a binary score 'yes' or 'no' to indicate whether the document is relevant to the question. 
    Provide the binary score as a JSON object with a single key {{\"score\"}} and no preamble or explanation.

    Retrieved Document: 
    {document}

    User Question: 
    {question}

    Your response must be a JSON object: {{"score": "yes" or "no"}}
    """,
    input_variables=["question", "document"],
)

retrieval_grader = prompt | llm | JsonOutputParser()

# Retrieve document
question = "How to save llm cost"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content 


print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

{'score': 'yes'}


In [5]:
# Answer Generation

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

from langchain_core.output_parsers import StrOutputParser

prompt = PromptTemplate(
    template="""
    You are an assistant for question-answering tasks.
    Use the following pieces of retrieved context to answer the question.
    If you don't know the answer, just say that you don't know.
    Keep your response concise and limited to three sentences.

    Question: {question}
    Context: {context}

    Answer: 
    """,
    input_variables=["question", "context"],
)


rag_chain = prompt | llm | StrOutputParser()

# Retrieve document
question = "How to save llm cost"
docs = retriever.invoke(question)
# Format the retrieved documents
context = format_docs(docs) 

generation = rag_chain.invoke({"context": context, "question": question})

print(generation)

To save on LLM costs, you can monitor and log costs using platforms like L Smith to identify optimization areas, regularly optimize usage as your user base grows, and select the right models for specific tasks to maintain performance while reducing expenses. Additionally, changing to a less expensive model for simpler tasks can significantly cut costs. Stay informed on new methods and technologies for further cost reduction opportunities.


In [6]:
from langchain_community.tools.tavily_search import TavilySearchResults
import os

os.environ["TAVILY_API_KEY"] = "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"
web_search_tool = TavilySearchResults(k=3)

In [9]:
# Hallucination Grader

prompt = PromptTemplate(
    template="""
    You are a grader assessing whether an answer is factually supported by the provided set of facts.
    Give a binary score 'yes' or 'no' to indicate whether the answer is grounded in the given facts.
    Provide the binary score as a JSON object with a single key {{\"score\"}} and no preamble or explanation.


    Facts:
    -----------
    {documents}
    -----------

    Answer:
    -----------
    {generation}
    -----------

    Your response must be a JSON object: {{"score": "yes" or "no"}}
    """,
    input_variables=["generation", "documents"],
)

hallucination_grader = prompt | llm | JsonOutputParser()

# Invoke the Hallucination Grader
hallucination_grader.invoke({"documents": doc_txt, "generation": generation})

{'score': 'yes'}

In [10]:
#  Answer Grader

prompt = PromptTemplate(
    template="""
    You are a grader assessing whether an answer is useful to resolve the question and relavant to its context.
    Give a binary score 'yes' or 'no' to indicate whether the answer is useful in resolving the question.
    Provide the binary score as a JSON object with a single key {{\"score\"}} and no preamble or explanation.

    Answer:
    -----------
    {generation}
    -----------

    Question:
    -----------
    {question}
    -----------

    Your response must be a JSON object: {{"score": "yes" or "no"}}
    """,
    input_variables=["generation", "question"],
)

answer_grader = prompt | llm | JsonOutputParser()

answer_grader.invoke({"question": question, "generation": generation})

{'score': 'yes'}

In [None]:
from typing_extensions import TypedDict
from typing import List
from langchain.schema import Document

# Define GraphState
class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
    """
    question: str
    generation: str
    web_search: str
    documents: List[str]

# Define Nodes
def retrieve(state):
    """
    Retrieve documents from vectorstore.

    Args:
        state (dict): The current graph state.

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents.
    """
    print("----RETRIEVE----")
    question = state["question"]
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}

def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.
    If any document is not relevant, we will set a flag to run web search.

    Args:
        state (dict): The current graph state.

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state.
    """

    print("----CHECK DOCUMENT RELEVANCE TO QUESTION----")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    web_search = "No"

    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score["score"]

        # Document relevant
        if grade.lower() == "yes":
            print("----GRADE: DOCUMENT RELEVANT----")
            filtered_docs.append(d)
        # Document not relevant
        else:
            print("----GRADE: DOCUMENT NOT RELEVANT----")
            # We do not include the document in filtered_docs
            # # We set a flag to indicate that we want to run web search
            web_search = "Yes"
            continue
    
    return {"documents": filtered_docs, "question": question, "web_search": web_search}


def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("==GENERATE==")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})

    return {"documents": documents, "question": question, "generation": generation}

   
def web_search(state):
    """
    Web search based on the question.

    Args:
        state (dict): The current graph state.

    Returns:
        state (dict): Appended web results to documents.
    """

    print("----WEB SEARCH----")
    question = state["question"]
    documents = state["documents"]

    # Web search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)

    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]

    return {"documents": documents, "question": question}

### Conditional edges
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search.

    Args:
        state (dict): The current graph state.

    Returns:
        str: Binary decision for next node to call
    """

    print("----ASSESS GRADED DOCUMENTS----")
    question = state["question"]
    web_search = state["web_search"]
    filtered_documents = state["documents"]

    if web_search == "Yes":
        # All documents have been filtered, check relevance
        # We will re-generate a new query
        print("----DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH----")
        return "websearch"
    else:
        # We have relevant documents, so generate answer
        print("----DECISION: GENERATE----")
        return "generate"
    
### Conditional edges
def grade_generation_v_documents_and_question(state):
    """
    Determines whether the answer is grounded based on the documents and is relavant to the question

    Args:
        state (dict): The current graph state.

    Returns:
        str: Binary decision for next node to call
    """
    print("==CHECK HALLUCINATIONS==")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score["score"]

    # Check hallucination
    if grade == "yes":
        print("==DECISION: GENERATION IS GROUNDED IN DOCUMENTS==")
        # Check question-answering
        print("==GRADE GENERATION vs QUESTION==")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score["score"]
        if grade == "yes":
            print("==DECISION: GENERATION ADDRESSES QUESTION==")
            return "useful"
        else:
            print("==DECISION: GENERATION DOES NOT ADDRESS QUESTION==")
            return "not useful"
    else:
        print("==DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY==")
        return "not supported"


In [12]:
from langgraph.graph import END, StateGraph
workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("websearch", web_search)  # web search
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generate

<langgraph.graph.state.StateGraph at 0x165ee4c6a50>

In [13]:
# Build graph
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")

workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)

workflow.add_edge("websearch", "generate")

workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)


<langgraph.graph.state.StateGraph at 0x165ee4c6a50>

In [14]:
# Compile
app = workflow.compile()

# Test
from pprint import pprint
inputs = {"question": "how to save llm cost?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
print(value["generation"])

----RETRIEVE----
'Finished running: retrieve:'
----CHECK DOCUMENT RELEVANCE TO QUESTION----
----GRADE: DOCUMENT RELEVANT----
----GRADE: DOCUMENT RELEVANT----
----GRADE: DOCUMENT RELEVANT----
----GRADE: DOCUMENT RELEVANT----
----ASSESS GRADED DOCUMENTS----
----DECISION: GENERATE----
'Finished running: grade_documents:'
==GENERATE==
==CHECK HALLUCINATIONS==
==DECISION: GENERATION IS GROUNDED IN DOCUMENTS==
==GRADE GENERATION vs QUESTION==
==DECISION: GENERATION ADDRESSES QUESTION==
'Finished running: generate:'
To save on LLM costs, consider using observability platforms to monitor and log costs, allowing for identification and optimization of cost-heavy areas. Implement strategies such as selecting the right model for specific tasks, using smaller models for less complex queries, and continuously optimizing LLM usage. These methods can potentially reduce LLM costs by up to 78% or more while maintaining performance and user experience.
