# Local RAG agent

![SNOWFALL](RAG_Model_Architecture.png) 


### Using Ollama and llama 3.2

In [1]:
from langchain_ollama import ChatOllama

local_llm = "llama3.2:1b-instruct-fp16"
llm = ChatOllama(model=local_llm, temperature=0)
llm_json_mode = ChatOllama(model=local_llm, temperature=0, format="json")

## Search - Tavily

In [2]:
import os
import getpass


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("TAVILY_API_KEY")
os.environ["TOKENIZERS_PARALLELISM"] = "true"

### Tracing (optional)
LangSmith for tracing

In [3]:
_set_env("LANGCHAIN_API_KEY")
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "local-llama32-rag"

### Vectorstore

In [4]:
import os
import re
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import SKLearnVectorStore
from langchain_nomic.embeddings import NomicEmbeddings
from langchain.embeddings import fastembed
from PyPDF2 import PdfReader

pdf_path = "./data/ersattningsmodell_vaders_2019.pdf"

def clean_text(text: str) -> str:

    if not isinstance(text, str):
        raise ValueError("Input must be a string.")

    # Remove excessive newlines and leading/trailing whitespace
    text = re.sub(r'\n+', '\n', text).strip()

    # Replace multiple spaces with a single space
    text = re.sub(r'\s{2,}', ' ', text)

    return text

# Function to parse PDF and convert to Documents
def parse_pdf_with_pypdf(pdf_path):
    """Extracts text from a PDF using PyPDF2 and converts each page to a LangChain Document."""
    reader = PdfReader(pdf_path)
    documents = []
    
    for page_num, page in enumerate(reader.pages):
        text = page.extract_text()
        if text:
            cleaned_text = clean_text(text)  # Clean the text immediately after extraction
            documents.append(Document(page_content=cleaned_text, metadata={"page_number": page_num + 1}))
    
    return documents

# Load PDF and parse it
docs_list = parse_pdf_with_pypdf(pdf_path)

# Cleaning the text of each document
for doc in docs_list:
    doc.page_content = clean_text(doc.page_content)

print(f"Extracted {len(docs_list)} pages from PDF.")

USER_AGENT environment variable not set, consider setting it to identify your requests.


Extracted 42 pages from PDF.


In [5]:
# Split documents
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=2000, chunk_overlap=300
)
doc_splits = text_splitter.split_documents(docs_list)

# Add to vectorDB
vectorstore = SKLearnVectorStore.from_documents(
    documents=doc_splits,
    embedding=NomicEmbeddings(model="nomic-embed-text-v1.5", inference_mode="local", device="gpu"),
)

# Create retriever
retriever = vectorstore.as_retriever(k=3)

RuntimeError: The 'gpt4all' package is required for local inference. Suggestion: `pip install "nomic[local]"`

### Components

In [6]:
### Router
import json
from langchain_core.messages import HumanMessage, SystemMessage

# Prompt
router_instructions = """You are an expert at Swedish and winter road maintenance employed by Klimator.

The vectorstore contains documents related to meteorological facts, winter road maintance laws, Road Weather Forecast and related fields of study.

Use the vectorstore for questions on these topics. For all else, and especially for current events, use web-search.

Return JSON with single key, datasource, that is 'websearch' or 'vectorstore' depending on the question."""

# Test router
test_web_search = llm_json_mode.invoke(
    [SystemMessage(content=router_instructions)]
    + [
        HumanMessage(
            content="Vilken är den senaste utvecklingen inom AI-teknik?"
        )
    ]
)
test_web_search_2 = llm_json_mode.invoke(
    [SystemMessage(content=router_instructions)]
    + [HumanMessage(content="Vad är syftet med att samla in mätdata från 14 dagar före starttiden?")]
)
test_vector_store = llm_json_mode.invoke(
    [SystemMessage(content=router_instructions)]
    + [HumanMessage(content="Vilka regler gäller för att en viss vädersituation ska räknas under en hel timme?")]
)
print(
    json.loads(test_web_search.content),
    json.loads(test_web_search_2.content),
    json.loads(test_vector_store.content),
)

ConnectError: [Errno 61] Connection refused

### Retrieval Grader

In [None]:
### Retrieval Grader

# Doc grader instructions
doc_grader_instructions = """You are a grader assessing relevance of a retrieved document to a user question.

If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant."""

# Grader prompt
doc_grader_prompt = """Here is the retrieved document: \n\n {document} \n\n Here is the user question: \n\n {question}. 

This carefully and objectively assess whether the document contains at least some information that is relevant to the question.

Return JSON with single key, binary_score, that is 'yes' or 'no' score to indicate whether the document contains at least some information that is relevant to the question."""

# Test
question = "Hur kombineras data från VViS och MESAN för att skapa väderutfall?"
docs = retriever.invoke(question)
doc_txt = docs[3].page_content
doc_grader_prompt_formatted = doc_grader_prompt.format(
    document=doc_txt, question=question
)
result = llm_json_mode.invoke(
    [SystemMessage(content=doc_grader_instructions)]
    + [HumanMessage(content=doc_grader_prompt_formatted)]
)
json.loads(result.content)

{'binary_score': 'no'}

In [None]:
doc_txt

'2018- 09-05 5 vilket kan innebära att observationer/värden som hämtas eller bygger på VV iS-data över tid inte är direkt jämförbara. 1.4.2 Bortfall av mätdata från VViS Bortfall innebär att mätdata saknas för en eller flera av variablerna lufttemperatur, väg ytetemperatur, relativ luftfuktighet och daggpunktstemperatur. Det finns tre typer av bortfal l. 1. Den första typen är kortvarig och innebär att en VViS -station har bortfall under högst 4 halvtimmar i följd. Då kopieras den saknade informationen från det sista giltiga mätvärdet. 2. Den andra typen av bortfall är längre än 2 timmar i följd. Då ersätts bortfallet med information från reservstationen från första bortfallshalvtimmen. Ersättningsdata kan t.ex. vara lufttemperatur. På fliken Bortfall preciseras användning av ersättningsdata ytterligare genom att man tillsammans med första och sista tidpunkt för ersättningsdata anger vilken ordinarie VViS -station som ersatts av vilken reserv. 3. Vid den tredje typen av bortfall har äv

### Generate

In [None]:
### Generate

# Prompt
rag_prompt = """You are an assistant for question-answering tasks in swedish. 

Here is the context to use to answer the question:

{context}

Think carefully about the above context. 

Now, review the user question:

{question}

Provide an answer to this questions using only the above context. 

Use three sentences maximum and keep the answer concise.

Answer:"""


# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Test
docs = retriever.invoke(question)
docs_txt = format_docs(docs)
rag_prompt_formatted = rag_prompt.format(context=docs_txt, question=question)
generation = llm.invoke([HumanMessage(content=rag_prompt_formatted)])
print(generation.content)

Data från VViS och MESAN kombineras genom att en medelvärde beräknas för varje typ av mätdata som saknas, baserat på ett mätvärde från ordinarie VViS-stationen. Denna medelvärde används sedan i kombinationen som har bortfall ersattes av det. Exempelvis kan temperaturdata beräknas för perioder med totalt bortfall tillsammans med en angivelse av att temperaturdata saknas.


### Halluciantion Grader

In [None]:
# Hallucination grader instructions
hallucination_grader_instructions = """

You are a teacher grading a quiz in Swedish. 

You will be given FACTS and a STUDENT ANSWER. 

Here is the grade criteria to follow:

(1) Ensure the STUDENT ANSWER is grounded in the FACTS. 

(2) Ensure the STUDENT ANSWER does not contain "hallucinated" information outside the scope of the FACTS.

Score:

A score of yes means that the student's answer meets all of the criteria. This is the highest (best) score. 

A score of no means that the student's answer does not meet all of the criteria. This is the lowest possible score you can give.

Explain your reasoning in a step-by-step manner to ensure your reasoning and conclusion are correct. 

Avoid simply stating the correct answer at the outset."""

# Grader prompt
hallucination_grader_prompt = """FACTS: \n\n {documents} \n\n STUDENT ANSWER: {generation}. 

Return JSON with two two keys, binary_score is 'yes' or 'no' score to indicate whether the STUDENT ANSWER is grounded in the FACTS. And a key, explanation, that contains an explanation of the score."""

# Test using documents and generation from above
hallucination_grader_prompt_formatted = hallucination_grader_prompt.format(
    documents=docs_txt, generation=generation.content
)
result = llm_json_mode.invoke(
    [SystemMessage(content=hallucination_grader_instructions)]
    + [HumanMessage(content=hallucination_grader_prompt_formatted)]
)
json.loads(result.content)

{'binary_score': 'yes',
 'explanation': 'Studenter svar är grundat i fakta och inte på fördom eller misstänkelse.'}

In [None]:
### Answer Grader

# Answer grader instructions
answer_grader_instructions = """You are a teacher grading a quiz in Swedish. 

You will be given a QUESTION and a STUDENT ANSWER. 

Here is the grade criteria to follow:

(1) The STUDENT ANSWER helps to answer the QUESTION

Score:

A score of yes means that the student's answer meets all of the criteria. This is the highest (best) score. 

The student can receive a score of yes if the answer contains extra information that is not explicitly asked for in the question.

A score of no means that the student's answer does not meet all of the criteria. This is the lowest possible score you can give.

Explain your reasoning in a step-by-step manner to ensure your reasoning and conclusion are correct. 

Avoid simply stating the correct answer at the outset."""

# Grader prompt
answer_grader_prompt = """QUESTION: \n\n {question} \n\n STUDENT ANSWER: {generation}. 

Return JSON with two two keys, binary_score is 'yes' or 'no' score to indicate whether the STUDENT ANSWER meets the criteria. And a key, explanation, that contains an explanation of the score."""

# Test
question = "Vad är MESAN"
answer = "MESAN står för MESoskalig ANalys, vilket i korthet innebär numerisk analys av olika meteorologiska parametrar på en nivå som gör att väderfenomen med en rumslig upplösning på ungefär 5–50 km kan beskrivas."

# Test using question and generation from above
answer_grader_prompt_formatted = answer_grader_prompt.format(
    question=question, generation=answer
)
result = llm_json_mode.invoke(
    [SystemMessage(content=answer_grader_instructions)]
    + [HumanMessage(content=answer_grader_prompt_formatted)]
)
json.loads(result.content)

{'binary_score': 'no',
 'explanation': 'Studentens svar innehåller endast en del av informationen som frågan förespråkar. Det är inte tillräckligt med att beskriva MESAN, och studenten har inte tagit hänsyn till rumslig upplösning i analyserna.'}

## Web Search Tool

In [None]:
from langchain_community.tools.tavily_search import TavilySearchResults

web_search_tool = TavilySearchResults(k=3)

#web_search_tool.invoke("What is Klimator?")

# LangGraph

In [None]:
import operator
from typing_extensions import TypedDict
from typing import List, Annotated


class GraphState(TypedDict):
    """
    Graph state is a dictionary that contains information we want to propagate to, and modify in, each graph node.
    """

    question: str  # User question
    generation: str  # LLM generation
    web_search: str  # Binary decision to run web search
    max_retries: int  # Max number of retries for answer generation
    answers: int  # Number of answers generated
    loop_step: Annotated[int, operator.add]
    documents: List[str]  # List of retrieved documents

### Nodes 

In [None]:
from langchain.schema import Document
from langgraph.graph import END


### Nodes
def retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Write retrieved documents to documents key in state
    documents = retriever.invoke(question)
    return {"documents": documents}


def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]
    loop_step = state.get("loop_step", 0)

    # RAG generation
    docs_txt = format_docs(documents)
    rag_prompt_formatted = rag_prompt.format(context=docs_txt, question=question)
    generation = llm.invoke([HumanMessage(content=rag_prompt_formatted)])
    return {"generation": generation, "loop_step": loop_step + 1}


def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    
    # Track wether any relevant document is found
    relevant_doc_found = False

    for d in documents:
        doc_grader_prompt_formatted = doc_grader_prompt.format(
            document=d.page_content, question=question
        )
        result = llm_json_mode.invoke(
            [SystemMessage(content=doc_grader_instructions)]
            + [HumanMessage(content=doc_grader_prompt_formatted)]
        )
        grade = json.loads(result.content)["binary_score"]
        # Document relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
            relevant_doc_found = True

        # Document not relevant
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # We do not include the document in filtered_docs
            continue
    
    web_search = "No" if relevant_doc_found else "Yes"

    return {"documents": filtered_docs, "web_search": web_search}


def web_search(state):
    """
    Web search based based on the question

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Appended web results to documents
    """

    print("---WEB SEARCH---")
    question = state["question"]
    documents = state.get("documents", [])

    # Web search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    documents.append(web_results)
    return {"documents": documents}

### Edges

In [None]:
def route_question(state):
    """
    Route question to web search or RAG

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    route_question = llm_json_mode.invoke(
        [SystemMessage(content=router_instructions)]
        + [HumanMessage(content=state["question"])]
    )
    source = json.loads(route_question.content)["datasource"]
    if source == "websearch":
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "websearch"
    elif source == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"


def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    question = state["question"]
    web_search = state["web_search"]
    filtered_documents = state["documents"]

    if web_search == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: NOT ALL DOCUMENTS ARE RELEVANT TO QUESTION, INCLUDE WEB SEARCH---"
        )
        return "websearch"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"


def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]
    max_retries = state.get("max_retries", 3)  # Default to 3 if not provided

    hallucination_grader_prompt_formatted = hallucination_grader_prompt.format(
        documents=format_docs(documents), generation=generation.content
    )
    result = llm_json_mode.invoke(
        [SystemMessage(content=hallucination_grader_instructions)]
        + [HumanMessage(content=hallucination_grader_prompt_formatted)]
    )
    grade = json.loads(result.content)["binary_score"]

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        # Test using question and generation from above
        answer_grader_prompt_formatted = answer_grader_prompt.format(
            question=question, generation=generation.content
        )
        result = llm_json_mode.invoke(
            [SystemMessage(content=answer_grader_instructions)]
            + [HumanMessage(content=answer_grader_prompt_formatted)]
        )
        grade = json.loads(result.content)["binary_score"]
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        elif state["loop_step"] <= max_retries:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
        else:
            print("---DECISION: MAX RETRIES REACHED---")
            return "max retries"
    elif state["loop_step"] <= max_retries:
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"
    else:
        print("---DECISION: MAX RETRIES REACHED---")
        return "max retries"

## Control Flow

In [None]:
from langgraph.graph import StateGraph
from IPython.display import Image, display

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("websearch", web_search)  # web search
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generate

# Build graph
workflow.set_conditional_entry_point(
    route_question,
    {
        "websearch": "websearch",
        "vectorstore": "retrieve",
    },
)
workflow.add_edge("websearch", "generate")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
        "max retries": END,
    },
)

# Compile
graph = workflow.compile()
display(Image(graph.get_graph().draw_mermaid_png()))

NameError: name 'GraphState' is not defined

In [None]:
inputs = {"question": "Hur beräknas ersättningsunderlaget vid upprepade snöfall eller snödrev?", "max_retries": 3}
for event in graph.stream(inputs, stream_mode="values"):
    print(event)

---ROUTE QUESTION---
---ROUTE QUESTION TO RAG---
{'question': 'Hur beräknas ersättningsunderlaget vid upprepade snöfall eller snödrev?', 'max_retries': 3, 'loop_step': 0}
---RETRIEVE---
{'question': 'Hur beräknas ersättningsunderlaget vid upprepade snöfall eller snödrev?', 'max_retries': 3, 'loop_step': 0, 'documents': [Document(metadata={'id': 'a52c0938-a887-4fcc-ba94-9aa95fd5ce08', 'page_number': 21}, page_content='21 4 Beräkning av ersättningsunderlag Utgångspunkten för beräkningar av väderutfall – grunden för ersättning av vinterväghållningsåtgärder – är de väderbeskrivningar på timnivå som tas fram inom ett driftområde för varje kombination av VViS -station och MESAN -ruta. Beräkningar av väderutfall görs för en kombination i taget och sammanfattas sedan för hela driftområdet. Beräkningen utförs stegvis enligt prioritetsordning. Varje beräkningssteg om fattar tre moment; Första momentet i varje beräkningssteg är att sammanföra vädersituationerna på timnivå till längre vädertillfäl