# Env Variables & Params 

In [3]:
import os
from dotenv import load_dotenv
from utils.config import CONFIG

load_dotenv(dotenv_path=".env.local")

LANGCHAIN_TRACING_V2 = os.getenv('LANGCHAIN_TRACING_V2')
LANGCHAIN_ENDPOINT = os.getenv('LANGCHAIN_ENDPOINT')
LANGCHAIN_API_KEY = os.getenv('LANGCHAIN_API_KEY')
LANGCHAIN_PROJECT = os.getenv('LANGCHAIN_PROJECT')

FIRECRAWL_API_KEY = os.getenv('FIRECRAWL_API_KEY')
TAVILY_API_KEY = os.getenv('TAVILY_API_KEY')


CHUNK_SIZE = CONFIG["CHUNK_SIZE"]
CHUNK_OVERLAP = CONFIG["CHUNK_OVERLAP"]
LOCAL_LLM = CONFIG["LOCAL_LLM"]
URLS = CONFIG["URLS"]

# Retrieve Documents

In [5]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import GPT4AllEmbeddings
from langchain_community.document_loaders.firecrawl import FireCrawlLoader
from langchain_community.vectorstores.utils import filter_complex_metadata
from langchain.docstore.document import Document
import requests

def scrape_with_firecrawl(url):
    endpoint = "https://api.firecrawl.dev/v1/scrape"
    headers = {
        "Authorization": f"Bearer {FIRECRAWL_API_KEY}",
        "Content-Type": "application/json"
    }
    body = {
        "url": url,
        # "mode": "scrape", # deprecated
        "formats": ["markdown"], # Request markdown directly
        "onlyMainContent": True,
        "removeBase64Images": True,
        "blockAds": True,
        "proxy": "basic",
        "timeout": 30000
    }
    response = requests.post(endpoint, headers=headers, json=body)

    if response.status_code == 403:
        print(f"[❌ BLOCKED] {url} - This site is not supported by Firecrawl.")
        return None
    if response.status_code != 200:
        print(f"[❌ FAIL] {url} - Status: {response.status_code} - {response.text}")
        return None
    
    result = response.json()
    content = result.get("data", {}).get("markdown", "").strip()

    if not content:
        print(f"[⚠️] No content extracted from {url}")
        return None

    return Document(page_content=content, metadata={"url": url})

docs = []
for url in URLS:
    try:
        doc = scrape_with_firecrawl(url)
        if doc and doc.page_content:
            docs.append(doc)
            print(f"[✅] Scraped: {url}")
        print(f"[ℹ️] Content length: {len(doc.page_content) if doc else 0}")
    except Exception as e:
        print(f"[❌] Failed: {url} - {e}")

# split docs
try:
    if docs and isinstance(docs[0], list):
        docs_list = [item for sublist in docs for item in sublist]
        print(f"[ℹ️] Flattened docs count: {len(docs_list)}")
    else:
        docs_list = docs
        print(f"[ℹ️] Docs already flat, count: {len(docs_list)}")
except Exception as e:
    print(f"[❌] Error flattening docs list: {e}")
    docs_list = docs  # fallback

try:
    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=CHUNK_SIZE, 
        chunk_overlap=CHUNK_OVERLAP
    )
    docs_split = text_splitter.split_documents(docs_list)
    print(f"[ℹ️] Split into {len(docs_split)} document chunks")
except Exception as e:
    print(f"[❌] Error during document splitting: {e}")

# filter out complex metadata, ensure proper doc format
filtered_docs = []
for i, doc in enumerate(docs_split):
    try:
        # ensure doc is an instance of Document & has a 'metadata' attribute
        if isinstance(doc, Document) and hasattr(doc, 'metadata'):
            clean_metadata = {k: v for k, v in doc.metadata.items() if not isinstance(v, (str, int, float, bool))}
            filtered_doc = Document(
                page_content=doc.page_content,
                metadata=clean_metadata
            )
            filtered_docs.append(filtered_doc)
        else:
            print(f"[⚠️] Skipping doc at index {i}: Invalid type or missing metadata")
    except Exception as e:
        print(f"[❌] Error processing doc at index {i}: {e}")

print(f"[ℹ️] Filtered docs count: {len(filtered_docs)}")

# add to vectorDb
try:
    vectorstore = Chroma.from_documents(
        documents=filtered_docs,
        collection_name="rag-chroma",
        embedding=GPT4AllEmbeddings(),
    )
    retriever = vectorstore.as_retriever()
    print(f"[✅] Vectorstore created and retriever initialized")
except Exception as e:
    print(f"[❌] Error creating vectorstore or retriever: {e}")

[✅] Scraped: https://python.langchain.com/docs/introduction/
[ℹ️] Content length: 10885
[✅] Scraped: https://www.ai-jason.com/learning-ai/how-to-reduce-llm-cost
[ℹ️] Content length: 7064
[✅] Scraped: https://www.ai-jason.com/learning-ai/ai-agent-vision-tutorial
[ℹ️] Content length: 11777
[✅] Scraped: https://www.ai-jason.com/learning-ai/ai-research-agent
[ℹ️] Content length: 5937
[✅] Scraped: https://deepmind.google/discover/blog/alphaevolve-a-gemini-powered-coding-agent-for-designing-advanced-algorithms/
[ℹ️] Content length: 13164
[✅] Scraped: https://arxiv.org/abs/2110.04599
[ℹ️] Content length: 7248
[ℹ️] Docs already flat, count: 6
[ℹ️] Split into 80 document chunks
[ℹ️] Filtered docs count: 80
[✅] Vectorstore created and retriever initialized


# Grade Documents -> Retrieval Grader

In [6]:
from langchain.prompts import PromptTemplate
# from langchain_community.chat_models import ChatOllama
from langchain_ollama import OllamaLLM
from langchain_core.output_parsers import JsonOutputParser

llm = OllamaLLM(
    model=LOCAL_LLM,
    format="json",
    temperature=0.1,
    max_tokens=512,
    streaming=True,
    verbose=True,
)

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing relevance of a retrieved document to a user question. If the document contains keywords related to the user question, grade it as relevant. It dos not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'YES' or 'NO' score to indicate whether the document is relevant to the question. \n 
    Provide the binary score as a JSON with a single key 'score' and no premable or explaination.
    <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the retrieved document: \n\n {document}\n\n
    Here is the user question: {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "document"],
)

retrieval_grader = prompt | llm | JsonOutputParser()
question = "how to save LLM costs?"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

{'score': 'YES'}


# Generate Answer

In [7]:
from langchain.prompts import PromptTemplate
from langchain import hub
from langchain_core.output_parsers import StrOutputParser

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an assistant for question-answering tasks.
    Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know.
    Use three sentences maximum and keep the answer concise and to the point <|eot_id|><|start_header_id|>user<|end_header_id|>
    Question: {question} \n
    Context: {context} \n
    Answer: <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "document"],
)

llm = OllamaLLM(
    model=LOCAL_LLM,
    temperature=0.1, 
    max_tokens=512, 
    streaming=True, 
    verbose=True
)

# post-processing
def format_docs(docs):
    return ("\n\n".join(doc.page_content for doc in docs))

# chain
rag_chain = prompt | llm | StrOutputParser()

question = "how to save LLM costs?"
docs = retriever.invoke(question)
generation = rag_chain.invoke({
    "context": docs,
    "question": question,
})
print(generation)

To save LLM costs, you can carefully select the right models for specific tasks, optimize agent memory, and use techniques like LLM Lingua to achieve cost savings while maintaining high performance and user experience. Additionally, staying updated with the latest research and developments in the field can help discover new cost optimization methods. This approach can reduce LLM costs by up to 78% or more.


# Web Search via Tavily -> check accuracy

In [12]:
from langchain_community.tools.tavily_search import TavilySearchResults

web_search_tool = TavilySearchResults(k=3)

# Hallucination Grader

In [9]:
llm = OllamaLLM(
    model=LOCAL_LLM,
    format="json",
    temperature=0.1, 
    max_tokens=512, 
    streaming=True, 
    verbose=True
)

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether an answer is grounded in / supported by a set of facts. Give a binary score 'YES' or 'NO' score to indicate whether the answer is grounded in / supported by the facts. Provide the binary score as a JSON with a single key 'score' and no premable or explaination. <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here are all the facts:
    \n -------- \n
    {documents}
    \n -------- \n
    Here is the answer: {generation} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "documents"],
)

hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader.invoke({
    "documents": docs,
    "generation": generation,
})

{'score': 'YES'}

# Answer Grader

In [10]:
llm = OllamaLLM(
    model=LOCAL_LLM,
    format="json",
    temperature=0.1, 
    max_tokens=512, 
    streaming=True, 
    verbose=True
)

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether an answer is useful to resolve a question. Give a binary score 'YES' or 'NO' score to indicate whether the answer is useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no premable or explaination. <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here are all the facts:
    \n -------- \n
    {generation}
    \n -------- \n
    Here is the question: {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "question"],
)

answer_grader = prompt | llm | JsonOutputParser()
answer_grader.invoke({
    "question": question,
    "generation": generation,
})

{'score': 'YES'}

# LangGraph -> Setup states & nodes

In [13]:
from typing_extensions import TypedDict
from typing import List
from langchain.schema import Document

# define graph state
class GraphState(TypedDict):
    """
    Represents the state of the graph.
    
    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
    """
    question: str
    generation: str
    web_search: str
    documents: List[str]


# define graph flow
def retrieve(state):
    """
    Retrieve docs from vectorstore
    
    Args:
        state (dict): the current graph state
    
    Returns:
        state (dict): new key added to the state, docs, that contains the retrieved documents
    """
    print("****RETRIEVE****")
    question = state["question"]
    documents = retriever.invoke(question)
    return {
        "documents": documents,
        "question": question,
    }

def grade_documents(state):
    """
    Determines whether the retrieved docs are relevant to the question.
    If any doc is relevant, we'll set a flag to run web search (for the same).

    Args:
        state (dict): the current graph state
    
    Returns:
        state (dict): filtered out irrelevant docs, and updated web_search state
    """
    print("****CHECK DOCUMENT RELEVANCE TO QUESTION****")
    question = state["question"]
    documents = state["documents"]
    
    # score each doc
    filtered_docs = []
    web_search = "NO"
    for d in documents:
        score = retrieval_grader.invoke({
            "question": question,
            "document": d.page_content,
        })
        grade = score['score']
        
        # doc relevant
        if grade.lower() == "YES":
            print("****GRADE: DOCUMENT RELEVANT****")
            filtered_docs.append(d)
        
        # doc not relevant
        else:
            print("****GRADE: DOCUMENT NOT RELEVANT****")
            # we do not include the doc in filtered_docs
            # we set a flag to indicate that we want to run web search
            web_search = "YES"
            continue
    
    return {
        "documents": filtered_docs,
        "question": question,
        "web_search": web_search,
    }

def generate(state):
    """
    Generate answer using RAG on retrieved docs
    
    Args:
        state (dict): the current graph state
    
    Returns:
        state (dict): new key added to the state, generation, that contains the LLM generation
    """
    print("****GENERATE****")
    question = state["question"]
    documents = state["documents"]
    
    # rag gen
    generation = rag_chain.invoke({
        "context": documents,
        "question": question,
    })
    return {
        "documents": documents,
        "question": question,
        "generation": generation,
    }

def web_search(state):
    """
    Web search based on the question
    
    Args:
        state (dict): the current graph state
    
    Returns:
        state (dict): appended web results to doc
    """
    print("****WEB SEARCH****")
    question = state["question"]
    documents = state["documents"]
    
    # web search
    docs = web_search_tool.invoke({"query": question})
    web_results = ("\n".join([d["content"] for d in docs]))
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]
    
    return {
        "documents": documents,
        "question": question,
    }

# conditional edges

def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search
    
    Args:
        state (dict): the current graph state
    
    Returns:
        str: Binary decision for next node to call
    """
    print("****ASSESS GRADED DOCUMENTS****")
    question = state["question"]
    web_search = state["web_search"]
    filtered_docs = state["documents"]
    
    if web_search == "YES":
        # all docs have been filtered check_relevance
        # we'll regenerate a new query
        print("****DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH****")
        return "websearch"
    else:
        # we have relevant docs, we can generate an answer
        print("****DECISION: ALL DOCUMENTS ARE RELEVANT TO QUESTION, GENERATE ANSWER****")
        return "generate"

def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("****CHECK HALLUCINATIONS****")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    grade = score['score']

    # Check hallucination
    if grade == "YES":
        print("****DECISION: GENERATION IS GROUNDED IN DOCUMENTS****")
        # Check question-answering
        print("****GRADE GENERATION vs QUESTION****")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score['score']
        if grade == "YES":
            print("****DECISION: GENERATION ADDRESSES QUESTION****")
            return "useful"
        else:
            print("****DECISION: GENERATION DOES NOT ADDRESS QUESTION****")
            return "not useful"
    else:
        print("****DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY****")
        return "not supported"


# compile graph
from langgraph.graph import END, StateGraph

workflow = StateGraph(GraphState)

# define nodes
workflow.add_node("websearch", web_search)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)

# build graph
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)

# compile graph
app = workflow.compile()

# test graph
from pprint import pprint

inputs = {"question": "how to save LLM costs?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key} with value: {value}")
print(value["generation"])

****RETRIEVE****
("Finished running: retrieve with value: {'documents': [Document(metadata={}, "
 "page_content='By implementing these strategies, you can reduce LLM costs by "
 'up to 78% or more. Remember, reducing costs while maintaining performance '
 'and user experience is a critical skill for AI startups. Stay proactive and '
 'continuously optimize your LLM usage to maximize efficiency and '
 'profitability.\\n\\n\\u200d\\n\\nGet free HubSpot AI For Marketers Course: '
 "[https://clickhubspot.com/xut](https://clickhubspot.com/xut)\\n\\n🔗 Links'), "
 "Document(metadata={}, page_content='By implementing these strategies, you "
 'can reduce LLM costs by up to 78% or more. Remember, reducing costs while '
 'maintaining performance and user experience is a critical skill for AI '
 'startups. Stay proactive and continuously optimize your LLM usage to '
 'maximize efficiency and profitability.\\n\\n\\u200d\\n\\nGet free HubSpot AI '
 'For Marketers Course: '
 "[https://clickhubspot.co