In [25]:
import os
from typing import List
from langchain_ollama import ChatOllama
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_core.documents import Document
from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import TextLoader, WebBaseLoader
from langchain_community.document_loaders.youtube import YoutubeLoader
from langchain_community.document_loaders import ArxivLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langgraph.graph import StateGraph, END
from dotenv import load_dotenv

load_dotenv()

True

In [26]:
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

urls = [
    "https://docs.langchain.com/oss/python/langgraph/overview",
    "https://docs.langchain.com/oss/python/langgraph/workflows-agents",
    "https://docs.langchain.com/oss/python/langgraph/graph-api#map-reduce-and-the-send-api"
]

# Load the Docs
docs = [WebBaseLoader(url) for url in urls]

# Split the Docs into Chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=60)
chunks = text_splitter.split_documents([doc for loader in docs for doc in loader.load()])

# Create the Vector Store
vectorstore = FAISS.from_documents(chunks, embeddings)
retriever=vectorstore.as_retriever(search_kwargs={"k": 3})

In [27]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model="Qwen/Qwen3-Coder-30B-A3B-Instruct", 
    temperature=0, 
    base_url="http://192.168.1.20:10000/v1", 
    api_key="123",
)

In [28]:
### Router
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field

class RouteQuery(BaseModel):
    """Route a user query to the most relevant datasource."""
    datasource: Literal["vectorestore", "web_search"] = Field(
        ...,
        description="Given a user question choost to route it to web search or a vectorestore."
    )
    
# LLM With function calling
structured_llm_router = llm.with_structured_output(RouteQuery)

# Prompt
system = """Your are an expert at routing a user question to a vectorestore or web search.
The vectorstore contains documents related to agents, prompt engineering, and adversarial attacks on LLMs.
Use the vectorstore for questions on these topics. Otherwise, use web-search.
"""

route_prompt = ChatPromptTemplate.from_messages([
    ("system", system),
    ("user", "Question: {question}")
])

question_router = route_prompt | structured_llm_router

In [29]:
question_router.invoke({"question": "What are the types of agent memory?"})

RouteQuery(datasource='vectorestore')

In [30]:
class GradeDocument(BaseModel):
   """Binary Score for relevance check on retrieved documents"""
   binary_score: str = Field(
       description="A binary score indicating whether the retrieved documents are relevant to the user's query. Return 'yes' if relevant, 'no' if not.",
   )
   
structured_llm_grader = llm.with_structured_output(GradeDocument)

system = """
You are a grader assessing relevance of a retrieved document to a usser question. \n
If the document contains keyword(s) or semantci meaning related to the question, grade it as relevant. \n
Give binary score 'yes' ord 'no' score to indicate whether the document is relevant to the question.
"""

grade_prompt = ChatPromptTemplate.from_messages([
    ("system", system),
    ("user", "User Question: {question}\n\n Retrieved Document: {document}")
])

retrieval_grader = grade_prompt | structured_llm_grader
question = "agent memory"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content

In [37]:
from langchain_classic import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages(
    [
          ("system", """You are an expert at question answering. Use the following context to answer the question.
            If you don't know the answer, just say "I don't know", don't try to make up an answer."""),
])

def format_docs(docs):
    return "\n\n".join([f"Document {i+1}:\n{doc.page_content}" for i, doc in enumerate(docs)])

rag_chain = prompt | llm | StrOutputParser()

generation = rag_chain.invoke({"context": format_docs(docs), "question": question})

In [39]:
system = """
Ypur are an expert question rewriter that converts an input question into a better version that is optimizesd \n
for web search. Look at the input try to reason about the underlying semantic intent / meaning.
"""

rewrite_prompt = ChatPromptTemplate.from_messages([
    ("system", system),
    ("user", "here is the initial question: \n\n {question} \n Formulate an improved question.")
])

question_rewriter = rewrite_prompt | llm | StrOutputParser()

In [40]:
from langchain_tavily import TavilySearch
tavily = TavilySearch(max_results=3)

In [41]:
from typing import List, TypedDict

class GraphState(TypedDict):
    """Represents the state of our graph.
    
    Attributes:
        question: question
        generation: LLM generation
        web_search: wheter to add search
        documents: retrieved documents
    """
    question: str
    generation: str
    web_search: bool
    documents: List[str]

In [45]:
from langchain_core.documents import Document

def retrieve(state):
    """Retrieve relevant documents based on the question in the state."""
    
    print("---RETRIEVE---")
    question = state["question"]
    
    # Retrieval
    documents = retriever.invoke(question)
    return {'documents': documents, 'question': question}


def generate(state):
    """Generate an answer based on the question and retrieved documents in the state."""
    
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]
    
    # Format documents for context
    context = format_docs(documents)
    
    # Generation
    generation = rag_chain.invoke({"context": context, "question": question})
    return {'generation': generation, 'question': question}

def grade_documents(state):
    """Grade the relevance of retrieved documents in the state."""
    
    print("---GRADE DOCUMENTS---")
    question = state["question"]
    documents = state["documents"]
    
    graded_docs = []
    web_search = "No"
    for doc in documents:
        score = retrieval_grader.invoke({"question": question, "document": doc.page_content})
        grade = score.binary_score
        
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            graded_docs.append(doc)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            web_search = "Yes"
            continue            
        
    
    return {'documents': graded_docs, 'question': question, "web_search": web_search}


def transform_query(state):
    """Transform the question in the state using web search if needed."""
    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]
    
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": better_question}

def web_search(state):
    """Perform web search to retrieve additional documents if needed."""
    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]
    
    # Get the raw response from Tavily
    raw_response = tavily.invoke(question)
    
    # Handle different response formats
    if isinstance(raw_response, str):
        # If it's already a string, use it directly
        web_results = raw_response
    elif isinstance(raw_response, dict):
        # If it's a dict, extract content
        if "results" in raw_response:
            # Extract content from results
            content_list = [r.get("content", "") for r in raw_response["results"]]
            web_results = "\n".join(content_list)
        else:
            web_results = raw_response.get("content", str(raw_response))
    elif isinstance(raw_response, list):
        # If it's a list, extract content
        content_list = [r.get("content", "") if isinstance(r, dict) else str(r) for r in raw_response]
        web_results = "\n".join(content_list)
    else:
        web_results = str(raw_response)
    
    web_results_doc = Document(page_content=web_results)
    documents.append(web_results_doc)
    return {"documents": documents, "question": question}

def route_question(state):
    """Route qestion to vectorestore or web search based on content."""
    print("---ROUTE QUESTION---")
    question = state["question"]
    
    route = question_router.invoke({"question": question})
    if route.datasource == "vectorestore":
        print("---ROUTE TO VECTORESTORE---")
        return "retrieve"
    else:
        print("---ROUTE TO WEB SEARCH---")
        return "transform_query"


In [46]:
def decide_to_generate(state):
    """Determine whether to generate an answer based on the relevance of documents."""
    print("---DECIDE TO GENERATE---")
    web_search_needed = state["web_search"]
    
    if web_search_needed == "Yes":
        print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTIN, TRANSFORM QUERY---")
        return "transform_query"
    else:
        print("---DECISION: GENERATE---")
        return "generate"

In [47]:
from langgraph.graph import StateGraph, END, START

workflow = StateGraph(GraphState)

workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)
workflow.add_node("transform_query", transform_query)
workflow.add_node("web_search_node", web_search)

workflow.add_conditional_edges(
    START,
    route_question,
    {
        "web_search_node": "generate",
        "vectorstore": "retrieve"
    }
)
workflow.add_edge("web_search_node", "generate")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate"
    }
)

workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    grade_documents,
    {
     "not supported": "generate",
     "useful": END,
     "not usefule": "transform_query",        
    },
)

app = workflow.compile()


In [None]:
result = app.invoke({"question": "What are the types of agent memory?"})