In [None]:
! pip install langchain-nomic langchain_community tiktoken langchainhub chromadb langchain langgraph tavily-python gpt4all firecrawl-py pymupdf langchain-ollama

In [None]:
import os

os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_ENDPOINT'] = 'https://api.smith.langchain.com'
os.environ['LANGSMITH_API_KEY'] = 'your langsmith api key'

local_llm = 'llama3.2'

In [3]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import GPT4AllEmbeddings
from langchain.docstore.document import Document
from langchain_community.vectorstores.utils import filter_complex_metadata

In [4]:
# Loading multiple PDFs from a source folder
from langchain_community.document_loaders import FileSystemBlobLoader
from langchain_community.document_loaders.generic import GenericLoader
from langchain_community.document_loaders.parsers import PyMuPDFParser

loader = GenericLoader(
    blob_loader=FileSystemBlobLoader(
        path='./data',
        glob = '*.pdf',
    ),

    blob_parser = PyMuPDFParser(),
)

docs = loader.load()
print(len(docs))    


368


In [5]:
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size = 500, chunk_overlap = 0,
)

doc_splits = text_splitter.split_documents(docs)

In [6]:
#current doc splits has metadata in it, we need to remove it
#print(doc_splits[0])

In [7]:
filtered_doc = []

for doc in doc_splits:
    if isinstance(doc, Document) and hasattr(doc, 'metadata'):
        if len(doc.page_content) < 7:
            continue  # Skip this document if it has less than 7 words
        clean_metadata = {k: v for k, v in doc.metadata.items() if isinstance(v, (str, int, float, bool))}
        filtered_doc.append(Document(page_content=doc.page_content, metadata=clean_metadata))

In [None]:
#print(filtered_doc[0:2])

In [11]:
# vectorDB
#from langchain_ollama import OllamaEmbeddings, much slower than GPT4AllEmbeddings
#from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
#from langchain_community.document_transformers import RankLLMReranker

vectorstore = Chroma.from_documents(
    documents = filtered_doc,
    collection_name='lk_rag',
    embedding= GPT4AllEmbeddings(),
)


# details: https://python.langchain.com/api_reference/chroma/vectorstores/langchain_chroma.vectorstores.Chroma.html#langchain_chroma.vectorstores.Chroma.as_retriever
retriever = vectorstore.as_retriever(
    search_type = "similarity_score_threshold",
    search_kwargs = {"k":5, "score_threshold":0.5},
)

# reranker = RankLLMReranker(
#     model_name = "RankZephyr",
#     top_n = 3,
# )

# retriever = ContextualCompressionRetriever(
#     base_compressor= reranker,
#     base_retriever= retriever,
# )




In [None]:
# Retrieving similar documents

from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser

llm = ChatOllama(model=local_llm, format="json", temperature=0.0)

prompt = PromptTemplate(
    template="""<|start_header_id|>system<|end_header_id|>
You are a grader assessing the relevance of a retrieved document to a user question.
If the document contains keywords related to the user question, grade it as relevant.
This does not need to be a stringent test—the goal is to filter out erroneous retrievals.

Provide a binary score as JSON with a single key `"score"` and a value of `"yes"` or `"no"`, without any explanation or extra text.
<|eot_id|>

<|start_header_id|>user<|end_header_id|>
Here is the retrieved document:

{documents}

Here is the user question:

{question}
<|eot_id|>
""",
    input_variables=["question", "documents"]
)


  llm = ChatOllama(model=local_llm, format="json", temperature=0.0)


In [None]:
# Pass the output from prompt to llm, then pass to JsonOutputParser(). "|" as pipeline chain (LCEL)
retrieval_grader = prompt | llm | JsonOutputParser() 
question  = "How does MapReduce work?"

docs = retriever.invoke(question)
doc_text = docs[0].page_content
print(doc_text)
#It should print 'yes' if retrievals are relevant; 'no' otherwise.
print(retrieval_grader.invoke({"question": question, "documents": doc_text}))

In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain import hub

prompt = PromptTemplate(
    template="""<|start_header_id|>system<|end_header_id|>
You are an assistant for question-answering tasks.
Use the following retrieved context to answer the question.
If you don't know the answer, just say that you don't know.
Use a maximum of three sentences and keep the answer concise.
<|eot_id|>

<|start_header_id|>user<|end_header_id|>
Question: {question}
Documents: {documents}
<|eot_id|>

<|start_header_id|>assistant<|end_header_id|>
""",
    input_variables=["question", "documents"]
)

llm = ChatOllama(model=local_llm, temperature=0.0)

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)



In [None]:
#run 
rag_chain = prompt | llm | StrOutputParser()
question = "what does serverless computing relies on?"
docs = retriever.invoke(question)
answer = rag_chain.invoke({"question": question, "documents": format_docs(docs)})
print(answer)

Serverless computing relies on cloud providers' infrastructure, such as Amazon Web Services (AWS), Microsoft Azure, or Google Cloud Platform (GCP). It also depends on the scalability and elasticity of these platforms to handle variable workloads. Additionally, serverless computing is built upon the concept of event-driven programming and function-as-a-service models.


## If No Relevant Info from the Input Local Knowledge Base, Look up Online

In [None]:
os.environ['TAVILY_API_KEY'] = "tvly-xxxxxx"

from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults(k=5)

## Check For Hallucination

In [None]:
llm = ChatOllama(model=local_llm, temperature=0.0)

prompt = PromptTemplate(
    template="""<|start_header_id|>system<|end_header_id|>
You are a grader assessing whether an answer is grounded in
or supported by a set of facts. Give a binary score 'yes' or 'no'
to indicate whether the answer is grounded or supported by the facts.
Provide the binary score as a JSON with a single key "score" and no preamble or explanation.
<|eot_id|>

<|start_header_id|>user<|end_header_id|>
Here are the facts:
\n --- \n
{documents}
\n --- \n
Here is the answer: {answer}
<|eot_id|>
""",
    input_variables=["documents", "answer"]
)

hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader.invoke({"documents": docs, "answer": answer})

{'score': 'no'}

## Answer Grader

In [None]:
llm = ChatOllama(model=local_llm, temperature=0.0)

prompt = PromptTemplate(
    template="""<|start_header_id|>system<|end_header_id|>
You are a grader assessing whether an answer is useful to answer a question. Give a binary score 'yes' or 'no'
to indicate whether the answer is useful to answer a question.
Provide the binary score as a JSON with a single key "score" and no preamble or explanation.
<|eot_id|>

<|start_header_id|>user<|end_header_id|>
Here are the question:
\n --- \n
{question}
\n --- \n
Here is the answer: {answer}
<|eot_id|>
""",
    input_variables=["question", "answer"]
)

answer_grader = prompt | llm | JsonOutputParser()
answer_grader.invoke({"question": question, "answer": answer})

{'score': 'yes'}

## LangGraph

In [None]:
from typing_extensions import TypedDict
from typing import List
from langchain.schema import Document

# States
class GraphState(TypedDict):
    """
    State of the graph

    Attributes:
    - question: The question to be answered
    - answer: The answer to the question
    - web_search: whether to add web search
    - documents: The documents to be used for answering the question
    
    """

    question: str
    answer: str
    web_search: bool
    documents: List[str]


# Nodes

def retrieve(state):
    """
    
    Retrieve docs from vector store
    
    input: state(dict):  graph state
    output: state(dict): graph state with newly added documents and states.

    """
    print("*** RETRIEVE ***")
    question = state["question"]

    docs = retriever.invoke(question)
    return {"documents": docs, "question": question}

def generate(state):
    """
    
    Generate answer from retrieved documents
    
    input: state(dict):  graph state
    output: state(dict): graph state with newly added answer.

    """
    print("*** GENERATE ***")
    docs = state["documents"]
    question = state["question"]
    answer = rag_chain.invoke({"question": question, "documents": docs})
    return {"documents": docs, "answer": answer, "question": question}

def grade_documents(state):
    """
    
    Grade the documents
    
    input: state(dict):  graph state
    output: state(dict): graph state with newly added grade.

    """
    print("*** GRADE DOCUMENTS ***")
    docs = state["documents"]
    question = state["question"]

    filtered_docs = []
    web_search = False

    for doc in docs:
        score = retrieval_grader.invoke({"question": question, "documents": doc.page_content})
        grade = score["score"]
        if grade.lower() == "yes":
            print("Graded: Document is relevant")
            filtered_docs.append(doc)
        else:
            print("Graded: Document is not relevant")
            web_search = True
            continue
    
    return {"documents": filtered_docs, "question": question, "web_search": web_search}


def web_search(state):
    """
    
    Search the web
    
    input: state(dict):  graph state
    output: state(dict): graph state with newly added web search results.

    """
    print("*** WEB SEARCH ***")
    question = state["question"]
    docs = state["documents"]

    searched_docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in searched_docs])
    web_results = Document(page_content=web_results)
    if docs is not None:
        docs.append(web_results)
    else:
        docs = [web_results]
    return {"documents": docs, "question": question}


In [None]:
### Condition edges

def decide_to_generate(state):

    """
    
    Decide whether to generate answer or do web search
    
    input: state(dict):  graph state
    output: bool: the next node to be executed.

    """
    print("*** Assess Graded Documents ***")
    question = state["question"]
    web_search = state["web_search"]
    filtered_documents = state["documents"]

    if web_search == True:
        print("Web search is required")
        return "websearch"
    else:
        print("Web search is not required")
        return "generate"
    
def is_hallucination_and_useful(state):
    """
    
    Checks for hallucination
    
    input: state(dict):  graph state
    output: bool: the next node to be executed

    """

    print("*** Assessing Hallucination ***")
    question = state["question"]
    answer = state["answer"]
    docs = state["documents"]

    score = hallucination_grader.invoke({"documents": docs, "answer": answer})
    grade = score["score"]
    if grade.lower() == "yes":
        print("Graded: Answer is grounded")

        print("Grading answer vs question")
        score = answer_grader.invoke({"question": question, "answer": answer})
        grade = score["score"]
        if grade.lower() == "yes":
            print("Graded: Answer is useful")
            return "useful"
        else:
            print("Graded: Answer is not useful")
            return "not useful"
    else:
        print("Graded: Answer is not grounded, Re-try")
        return "not grounded/supported"

In [None]:
from langgraph.graph import END, StateGraph
workflow = StateGraph(GraphState)

workflow.add_node("websearch", web_search)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)

<langgraph.graph.state.StateGraph at 0x14f163740>

In [None]:
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)
workflow.add_edge("websearch", "generate")  
workflow.add_conditional_edges(
    "generate", 
    is_hallucination_and_useful,
    {
        "useful": END,
        "not useful": "websearch", 
        "not grounded/supported": "generate",
    })

<langgraph.graph.state.StateGraph at 0x14f163740>

In [None]:
# Compile
app = workflow.compile()

# Test
from pprint import pprint
inputs = {
    "question": "Is TensorFlow created by Google?",
}

for output in app.stream(inputs):
    for k,v in output.items():
        pprint(f"Finished running: {k}:")
print("*****************************",v["answer"])