# Section 4: Memory, and Human-in-the-Loop

![HIL](../images/hil.png)

In this section, we'll cover a series of topics that are important to leveraging the full power of LangGraph. Here are the topics

1. Streaming: How does streaming work with LangGraph?
2. Memory: How can I persist information in our app across invocations?
    - Local memory
    - Threads
    - External memory
3. Breakpoints and Human-in-the-loop

After this section, we'll take a closer look at memory and HIL in LangGraph Studio, as well as production monitoring in LangSmith.

## Setup

In [None]:
required_packages = [
    "langgraph", 
    "langgraph-sdk", 
    "langgraph-checkpoint-sqlite", 
    "langsmith", 
    "langchain-community", 
    "langchain-core", 
    "langchain-openai", 
    "notebook", 
    "python-dotenv", 
    "chromadb"
]

# Install each package
for package in required_packages:
    %pip install {package}

In [2]:
import os

# Set environment variables
os.environ["OPENAI_API_KEY"] = ""
os.environ["LANGCHAIN_API_KEY"] = ""
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "langgraph-101"

In [None]:
# Alternatively, load environment variables from .env
from dotenv import load_dotenv
load_dotenv(dotenv_path="../.env", override=True)

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

RAG_PROMPT = """You are an assistant for question-answering tasks. 
Use the following pieces of retrieved context to answer the question. 
If you don't know the answer, just say that you don't know. 
Use three sentences maximum and keep the answer concise.

Question: {question} 
Context: {context} 
Answer:"""

RAG_PROMPT_WITH_MESSAGES = """You are an assistant for question-answering tasks. 
Use the following pieces of retrieved context to answer the latest question in the conversation. 
If you don't know the answer, just say that you don't know. 
The pre-existing conversation may provide important context to the question.
Use three sentences maximum and keep the answer concise.

Conversation: {conversation}
Context: {context} 
Answer:"""

# SiteMap loader
LANGGRAPH_DOCS = [
    "https://langchain-ai.github.io/langgraph/",
    "https://langchain-ai.github.io/langgraph/tutorials/customer-support/customer-support/",
    "https://langchain-ai.github.io/langgraph/tutorials/chatbots/information-gather-prompting/",
    "https://langchain-ai.github.io/langgraph/tutorials/code_assistant/langgraph_code_assistant/",
    "https://langchain-ai.github.io/langgraph/tutorials/multi_agent/multi-agent-collaboration/",
    "https://langchain-ai.github.io/langgraph/tutorials/multi_agent/agent_supervisor/",
    "https://langchain-ai.github.io/langgraph/tutorials/multi_agent/hierarchical_agent_teams/",
    "https://langchain-ai.github.io/langgraph/tutorials/plan-and-execute/plan-and-execute/",
    "https://langchain-ai.github.io/langgraph/tutorials/rewoo/rewoo/",
    "https://langchain-ai.github.io/langgraph/tutorials/llm-compiler/LLMCompiler/",
    "https://langchain-ai.github.io/langgraph/concepts/high_level/",
    "https://langchain-ai.github.io/langgraph/concepts/low_level/",
    "https://langchain-ai.github.io/langgraph/concepts/agentic_concepts/",
    "https://langchain-ai.github.io/langgraph/concepts/human_in_the_loop/",
    "https://langchain-ai.github.io/langgraph/concepts/multi_agent/",
    "https://langchain-ai.github.io/langgraph/concepts/persistence/",
    "https://langchain-ai.github.io/langgraph/concepts/streaming/",
    "https://langchain-ai.github.io/langgraph/concepts/faq/"
]

def get_vector_db_retriever():
    # Set embeddings
    embd = OpenAIEmbeddings()
    # Docs to index
    urls = LANGGRAPH_DOCS
    # Load
    docs = [WebBaseLoader(url).load() for url in urls]
    docs_list = [item for sublist in docs for item in sublist]
    # Split
    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=200, chunk_overlap=0
    )
    doc_splits = text_splitter.split_documents(docs_list)
    # Add to vectorstore
    vectorstore = Chroma.from_documents(
        documents=doc_splits,
        collection_name="rag-chroma",
        embedding=embd,
    )
    retriever = vectorstore.as_retriever(lambda_mult=0)
    return retriever

Our graph is getting pretty big and complex! Let's copy over what we've done through the first three modules

In [None]:
from langchain_openai import ChatOpenAI
from langchain.schema import Document
from typing import List
from typing_extensions import TypedDict, Annotated
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from pydantic import BaseModel, Field
from langgraph.constants import Send
from collections import Counter
from langgraph.graph import StateGraph
from langgraph.graph import START, END
from IPython.display import Image, display

# Fetch retriever
retriever = get_vector_db_retriever()

# Set up LLM
llm = ChatOpenAI(model_name="gpt-4o", temperature=0)

# Define our GraphState, InputState, and OutputState
def custom_documents_reducer(existing, update):
    # If we passed in a dictionary that asks for "overwrite", then we return the updated documents only
    if isinstance(update, dict) and update["type"] == "overwrite":
        return update["documents"]

    # Otherwise, we simple add the lists
    return existing + update
class GraphState(TypedDict):
    question: str
    rewritten_queries: List[str]
    generation: str
    documents: Annotated[List[Document], custom_documents_reducer]   # We use Annotated to add our custom reducer
    attempted_generations: int
class InputState(TypedDict):
    """
    Represents the input state of our graph.

    Attributes:
        question: question
    """
    question: str
class OutputState(TypedDict):
    """
    Represents the final outputstate of our graph.

    Attributes:
        generation: LLM generation
        documents: list of documents
    """
    generation: str
    documents: List[Document]

# Define Nodes and Conditional Edges
def retrieve_documents(state):
    """
    Retrieve documents

    Args:
        state (dict): A dictionary containing a question

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE DOCUMENTS---")
    question = state["question"]
    sample_answer_prompt_formatted = sample_answer_prompt.format(question=question)
    response = sample_answer_llm.invoke(
        [SystemMessage(content=sample_answer_system_prompt)] + [HumanMessage(content=sample_answer_prompt_formatted)]
    )
    sample_answer = response.sample_answer
    documents = retriever.invoke(f"{question}: {sample_answer}")    # Now we use our question and sample answer
    return {"documents": documents}
def generate_response(state: GraphState):
    """
    Generate response

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE RESPONSE---")
    question = state["question"]
    documents = state["documents"]
    attempted_generations = state.get("attempted_generations", 0)   # By default we set attempted_generations to 0 if it doesn't exist yet
    formatted_docs = "\n\n".join(doc.page_content for doc in documents)
    
    # RAG generation
    rag_prompt_formatted = RAG_PROMPT.format(context=formatted_docs, question=question)
    generation = llm.invoke([HumanMessage(content=rag_prompt_formatted)])
    return {
        "documents": documents,
        "question": question,
        "generation": generation,
        "attempted_generations": attempted_generations + 1   # In our state update, we increment attempted_generations
    }
class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""
    binary_score: str = Field(
        description="Documents are relevant to the question, 'yes' or 'no'"
    )
grade_documents_llm = llm.with_structured_output(GradeDocuments)
grade_documents_system_prompt = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""
grade_documents_prompt = "Here is the retrieved document: \n\n {document} \n\n Here is the user question: \n\n {question}"
def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # -- New logic to deduplicate documents our queries --
    doc_counter = Counter(doc.page_content for doc in documents)
    most_common_contents = doc_counter.most_common(5)
    top_documents = []
    for content, _ in most_common_contents:
        for d in documents:
            if d.page_content == content:
                top_documents.append(d)
                break

    # Score each one of our five most common documents
    filtered_docs = []
    for d in top_documents:
        grade_documents_prompt_formatted = grade_documents_prompt.format(document=d.page_content, question=question)
        score = grade_documents_llm.invoke(
            [SystemMessage(content=grade_documents_system_prompt)] + [HumanMessage(content=grade_documents_prompt_formatted)]
        )
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents": {"type": "overwrite", "documents": filtered_docs}, "question": question}
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or to terminate execution.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, END---"
        )
        return "none relevant"    # same as END
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "some relevant" 
class GradeHallucinations(BaseModel):
    """Binary score for hallucination present in generation answer."""

    binary_score: str = Field(
        description="Answer is grounded in the facts, 'yes' or 'no'"
    )
grade_hallucinations_llm = llm.with_structured_output(GradeHallucinations)
grade_hallucinations_system_prompt = """You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \n 
     Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts."""
grade_hallucinations_prompt = "Set of facts: \n\n {documents} \n\n LLM generation: {generation}"
ATTEMPTED_GENERATION_MAX = 3
def grade_hallucinations(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    documents = state["documents"]
    generation = state["generation"]
    attempted_generations = state["attempted_generations"]

    formatted_docs = "\n\n".join(doc.page_content for doc in documents)

    grade_hallucinations_prompt_formatted = grade_hallucinations_prompt.format(
        documents=formatted_docs,
        generation=generation
    )

    score = grade_hallucinations_llm.invoke(
        [SystemMessage(content=grade_hallucinations_system_prompt)] + [HumanMessage(content=grade_hallucinations_prompt_formatted)]
    )
    grade = score.binary_score

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        return "supported"
    elif attempted_generations >= ATTEMPTED_GENERATION_MAX:    # New condition!
        print("---DECISION: TOO MANY ATTEMPTS, GIVE UP---")
        raise RuntimeError("Too many attempted generations with hallucinations, giving up.")
        # return "give up"    # Note: We could also do this to silently fail
    else:
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"
class RewrittenQueries(BaseModel):
    """Rewritten queries based on the user's original question."""
    rewritten_queries: List[str] = Field(
        description="A list of rewritten versions of the user's query. Each rewritten version is rewritten differently, rephrased and potentially uses synonyms."
    )
rewritten_query_llm = llm.with_structured_output(RewrittenQueries)
rewritten_query_system_prompt = """You are an analyst in charge of taking a user's question as input, and reframing and rewriting it in different ways.\n
Your goal is to change the phrasing of the question, while making sure that the intent and meaning of the question is the same.\n
Return a list of rewritten_queries. The number will be specified by the user."""
rewritten_query_prompt = "Here is the user's question: \n\n {question}. Return {num_rewrites} queries."
class SampleAnswer(BaseModel):
    """Sample answer for an input question."""
    sample_answer: str = Field(
        description="A concise example answer for a question. This shouldn't exceed three sentences in length."
    )
sample_answer_llm = llm.with_structured_output(SampleAnswer)
sample_answer_system_prompt = """You are a novice in charge of taking a user's question as input, and generating a sample answer for it.\n
This sample answer should contain words that would likely be in a real answer, but is not grounded in any factual documents, the way a real answer would be."""
sample_answer_prompt = "Here is the user's question: \n\n {question}."
def generate_rewritten_queries(state):
    """
    Generates rewritten versions of the original user query

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates rewritten_queries key with a list of rewritten queries
    """
    print("---GENERATING REWRITTEN VERSIONS OF THE USER'S QUERY---")
    question = state["question"]
    num_rewrites = 3
    rewritten_query_prompt_formatted = rewritten_query_prompt.format(question=question, num_rewrites=num_rewrites)
    response = rewritten_query_llm.invoke(
        [SystemMessage(content=rewritten_query_system_prompt)] + [HumanMessage(content=rewritten_query_prompt_formatted)]
    )
    rewritten_queries = response.rewritten_queries

    return {"rewritten_queries": rewritten_queries}
def continue_to_retrieval_nodes(state: GraphState):
    edges_to_create = []
    # Add original question
    edges_to_create.append(Send("retrieve_documents", {"question": state["question"]}))
    # Add rewritten queries
    for rewritten_query in state["rewritten_queries"]:
        edges_to_create.append(Send("retrieve_documents", {"question": rewritten_query}))
    return edges_to_create

# Define our graph
graph_builder = StateGraph(GraphState, input=InputState, output=OutputState)
graph_builder.add_node("generate_rewritten_queries", generate_rewritten_queries)
graph_builder.add_node("retrieve_documents", retrieve_documents)
graph_builder.add_node("generate_response", generate_response)
graph_builder.add_node("grade_documents", grade_documents)

graph_builder.add_edge(START, "generate_rewritten_queries")
graph_builder.add_conditional_edges(
    "generate_rewritten_queries",
    continue_to_retrieval_nodes,
    ["retrieve_documents"]
)
graph_builder.add_edge("retrieve_documents", "grade_documents")
graph_builder.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "some relevant": "generate_response",
        "none relevant": END
    })
graph_builder.add_conditional_edges(
    "generate_response",
    grade_hallucinations,
    {
        "supported": END,
        "not supported": "generate_response"
    })

graph = graph_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))

## Streaming

LangGraph is built with [first class support for streaming](https://langchain-ai.github.io/langgraph/concepts/low_level/#streaming).

### Streaming State

Let's talk about ways to [stream our graph state](https://langchain-ai.github.io/langgraph/concepts/low_level/#streaming).

`.stream` and `.astream` are sync and async methods for streaming back results. 
 
LangGraph supports a few [different streaming modes](https://langchain-ai.github.io/langgraph/how-tos/stream-values/) for [graph state](https://langchain-ai.github.io/langgraph/how-tos/stream-values/):
 
* `values`: This streams the full state of the graph after each node is called.
* `updates`: This streams updates to the state of the graph after each node is called.

![values_vs_updates.png](https://cdn.prod.website-files.com/65b8cd72835ceeacd4449a53/66dbaf892d24625a201744e5_streaming1.png)

Let's look at `stream_mode="updates"`.

Because we stream with `updates`, we only see updates to the state after node in the graph is run.

Each `chunk` is a dict with `node_name` as the key and the updated state as the value.

Let's try both of these streaming modes

In [None]:
question = "Can I use LangGraph for customer support? I want to create an agent application."
for chunk in graph.stream({"question": question}, stream_mode="updates"):
    print(chunk)

Now, let's just look at the keys for simplicity

In [None]:
question = "Can I use LangGraph for customer support? I want to create an agent application."
for chunk in graph.stream({"question": question}, stream_mode="updates"):
    print(chunk.keys())

Now, let's stream the full state every time with `values`

In [None]:
question = "Can I use LangGraph for customer support? I want to create an agent application."
for chunk in graph.stream({"question": question}, stream_mode="values"):
    print(chunk.keys())

### Streaming Events

We often want to stream more than graph state.

In particular, with chat model calls it is common to stream the tokens as they are generated.

We can do this [using the `.astream_events` method](https://langchain-ai.github.io/langgraph/how-tos/streaming-from-final-node/#stream-outputs-from-the-final-node), which streams back events as they happen inside nodes!

Each event is a dict with a few keys:
 
* `event`: This is the type of event that is being emitted. 
* `name`: This is the name of event.
* `data`: This is the data associated with the event.
* `metadata`: Contains`langgraph_node`, the node emitting the event.

Let's have a look.

In [None]:
question = "Can I use LangGraph for customer support? I want to create an agent application."
async for event in graph.astream_events({"question": question}, version="v2"):
    print(f"Node: {event['metadata'].get('langgraph_node','')}. Type: {event['event']}. Name: {event['name']}")

The central point is that tokens from chat models within your graph have the `on_chat_model_stream` type.

We can use `event['metadata']['langgraph_node']` to select the node to stream from.

And we can use `event['data']` to get the actual data for each event, which in this case is an `AIMessageChunk`. 

In [None]:
node_to_stream = 'generate_response'
question = "Can I use LangGraph for customer support? I want to create an agent application."
async for event in graph.astream_events({"question": question}, version="v2"):
    # Get chat model tokens from a particular node 
    if event["event"] == "on_chat_model_stream" and event['metadata'].get('langgraph_node','') == node_to_stream:
        print(event["data"])

We can see the response for this particular node stream back one token at at time!

## Memory

In every example so far, [state has been transient](https://github.com/langchain-ai/langgraph/discussions/352#discussioncomment-9291220) to a single graph execution. If we invoke our graph for a second time, we are starting with a fresh state.

This limits our ability to have multi-turn conversations with interruptions. 

We can use [persistence](https://langchain-ai.github.io/langgraph/how-tos/persistence/) to address this! 
 
LangGraph can use a checkpointer to automatically save the graph state after each step. This built-in persistence layer gives us memory, allowing LangGraph to pick up from the last state update. 

### Adjusting our State for multi-turn conversations

Before we set up memory in our application, let's edit our State and Nodes so that instead of acting a single "question", we instead act on a list of "questions and answers".

We'll call our list "messages". These existing messages will all be used for our retrieval step. And at the end of our flow when our LLM responds, we will add the latest question and answer to our "messages" history. 

In [11]:
from langchain_core.messages import AnyMessage, get_buffer_string
import operator

class GraphState(TypedDict):
    question: str
    messages: Annotated[List[AnyMessage], operator.add]     # We now track a list of messages
    generation: str
    rewritten_queries: List[str]
    documents: Annotated[List[Document], custom_documents_reducer]
    attempted_generations: int

class InputState(TypedDict):
    question: str

class OutputState(TypedDict):
    messages: Annotated[List[AnyMessage], operator.add]     # We output messages now in our OutputState
    documents: List[Document]

Now let's edit our existing Nodes to use `messages` instead of `question` when generating sample answers, grading document relevance, and generating a response.

In [12]:
sample_answer_system_prompt = """You are a novice in charge of taking a conversation and a user's latest question as input, and generating a sample answer for the latest question.\n
This sample answer should contain words that would likely be in a real answer, but is not grounded in any factual documents, the way a real answer would be."""
sample_answer_prompt = "This is the conversation so far {conversation} \n\n. Here is the user's latest question: \n\n {question}."
def retrieve_documents(state):
    print("---RETRIEVE DOCUMENTS---")
    question = state["question"]
    messages = state.get("messages", [])
    conversation = get_buffer_string(messages)
    print(conversation)

    sample_answer_prompt_formatted = sample_answer_prompt.format(question=question, conversation=conversation)  # We use the entire chat history from messages to generate sample answers
    response = sample_answer_llm.invoke(
        [SystemMessage(content=sample_answer_system_prompt)] + [HumanMessage(content=sample_answer_prompt_formatted)]
    )
    sample_answer = response.sample_answer
    documents = retriever.invoke(f"{question}: {sample_answer}")    # Now we use our question and sample answer
    return {"documents": documents}

In [13]:
grade_documents_system_prompt = """You are a grader assessing relevance of a retrieved document to a conversation between a user and an AI assistant, and user's latest question. \n 
    If the document contains keyword(s) or semantic meaning related to the user question, definitely grade it as relevant. \n
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals that are not relevant at all. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""
grade_documents_prompt = "Here is the retrieved document: \n\n {document} \n\n Here is the conversation so far: \n\n {conversation} \n\n Here is the user question: \n\n {question}"
def grade_documents(state):
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]
    conversation = get_buffer_string(state["messages"])

    # -- New logic to deduplicate documents our queries --
    doc_counter = Counter(doc.page_content for doc in documents)
    most_common_contents = doc_counter.most_common(5)
    top_documents = []
    for content, _ in most_common_contents:
        for d in documents:
            if d.page_content == content:
                top_documents.append(d)
                break

    # Score each one of our five most common documents
    filtered_docs = []
    for d in top_documents:
        grade_documents_prompt_formatted = grade_documents_prompt.format(document=d.page_content, question=question, conversation=conversation)
        score = grade_documents_llm.invoke(
            [SystemMessage(content=grade_documents_system_prompt)] + [HumanMessage(content=grade_documents_prompt_formatted)]
        )
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents": {"type": "overwrite", "documents": filtered_docs}, "question": question, "messages": [HumanMessage(content=question)]}   # We add the question to Messages here

In [14]:
def generate_response(state: GraphState):
    print("---GENERATE RESPONSE---")
    documents = state["documents"]
    conversation = get_buffer_string(state["messages"])
    attempted_generations = state.get("attempted_generations", 0)   # By default we set attempted_generations to 0 if it doesn't exist yet
    formatted_docs = "\n\n".join(doc.page_content for doc in documents)
    
    # RAG generation
    rag_prompt_formatted = RAG_PROMPT_WITH_MESSAGES.format(context=formatted_docs, conversation=conversation)
    generation = llm.invoke([HumanMessage(content=rag_prompt_formatted)])
    return {
        "documents": documents,
        "generation": generation,
        "attempted_generations": attempted_generations + 1   # In our state update, we increment attempted_generations
    }

Awesome, we'll add one more node in our graph which configures memory ahead of our next invocation

In [15]:
def configure_memory(state):
    generation = state["generation"]
    return {
        "messages": [generation],   # Add generation to our messages_list
        "attempted_generations": 0,   # Reset this value to 0
        "documents": {"type": "overwrite", "documents": []}    # Reset documents to empty
    }

### Local Memory

Cool, now let's define our graph and add some local memory!

One of the easiest to work with is `MemorySaver`, an in-memory key-value store for Graph state.

All we need to do is compile the graph with a checkpointer, and our graph has memory!

In [None]:
# Define our graph
graph_builder = StateGraph(GraphState, input=InputState, output=OutputState)
graph_builder.add_node("generate_rewritten_queries", generate_rewritten_queries)
graph_builder.add_node("retrieve_documents", retrieve_documents)
graph_builder.add_node("generate_response", generate_response)
graph_builder.add_node("grade_documents", grade_documents)
graph_builder.add_node("configure_memory", configure_memory)    # New node for configuring memory

graph_builder.add_edge(START, "generate_rewritten_queries")
graph_builder.add_conditional_edges(
    "generate_rewritten_queries",
    continue_to_retrieval_nodes,
    ["retrieve_documents"]
)
graph_builder.add_edge("retrieve_documents", "grade_documents")
graph_builder.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "some relevant": "generate_response",
        "none relevant": END
    })
graph_builder.add_conditional_edges(
    "generate_response",
    grade_hallucinations,
    {
        "supported": "configure_memory",
        "not supported": "generate_response"
    })
graph_builder.add_edge("configure_memory", END)

from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()

graph = graph_builder.compile(checkpointer=memory)
display(Image(graph.get_graph().draw_mermaid_png()))

### Threads

When we use memory, we need to specify a `thread_id`.

This `thread_id` will store our collection of graph states.

* The checkpointer write the state at every step of the graph
* These checkpoints are saved in a thread 
* We can access that thread in the future using the `thread_id`


In [None]:
config = {"configurable": {"thread_id": "2"}}
question = "Can I use LangGraph for customer support? I want to create an agent application."
for chunk in graph.stream({"question": question}, config, stream_mode="values"):
    print(chunk)

Let's ask a follow-up with the same thread_id!

In [None]:
config = {"configurable": {"thread_id": "2"}}
question = "Can I use OSS models for this?"
for chunk in graph.stream({"question": question}, config, stream_mode="values"):
    print(chunk)

__Let's take a look in LangSmith__

### External Memory

In the previous example, we compiled our graph with local memory - but realistically, you'll want to hook this up to a database. Here, we'll show how to use [Sqlite as a checkpointer](https://langchain-ai.github.io/langgraph/concepts/low_level/#checkpointer), but other checkpointers, such as [Postgres](https://langchain-ai.github.io/langgraph/how-tos/persistence_postgres/) are available!

Create a file from your notebooks folder that sits in a directory /state_db/, called example.db

In [19]:
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver

db_path = "state_db/example.db"
conn = sqlite3.connect(db_path, check_same_thread=False)
memory = SqliteSaver(conn)
graph = graph_builder.compile(checkpointer=memory)

Now let's invoke our graph

In [None]:
config = {"configurable": {"thread_id": "2"}}
question = "Can I use LangGraph for customer support? I want to create an agent application."
graph.invoke({"question": question}, config)

Let's confirm that our state is saved locally still

In [None]:
config = {"configurable": {"thread_id": "2"}}
graph_state = graph.get_state(config)
graph_state

Using database like Sqlite means state is persisted! 

For example, we can re-start the notebook kernel and see that we can still load from Sqlite DB on disk.


## Breakpoints and Human-in-the-loop

Now, let's talk about the motivations for `human-in-the-loop`:

(1) `Approval` - We can interrupt our agent, surface state to a user, and allow the user to accept an action

(2) `Debugging` - We can rewind the graph to reproduce or avoid issues

(3) `Editing` - You can modify the state 

LangGraph offers several ways to get or update agent state to support various `human-in-the-loop` workflows.

First, we'll introduce [breakpoints](https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/breakpoints/#simple-usage), which provide a simple way to stop the graph at specific steps. 

We'll show how this enables user `approval`.

Let's assume that we are concerned about the quality of our retrieved documents, even after grading. All we need to do is simply compile the graph with `interrupt_after=["grade_documents"]`.

This means that the execution will be interrupted after the node `grade_documents` executes. Now, we can inspect the documents before generating a response.

In [22]:
graph = graph_builder.compile(checkpointer=memory, interrupt_after=["grade_documents"])

In [None]:
config = {"configurable": {"thread_id": "3"}}
question = "Can I use LangGraph for building a code assistant?"
for chunk in graph.stream({"question": question}, config, stream_mode="values"):
    print(chunk)

We can get the state and look at the next node to call.

This is a nice way to see that the graph has been interrupted.

In [None]:
state = graph.get_state(config)
state.next

Now, we'll introduce a nice trick.

When we invoke the graph with `None`, it will just continue from the last state checkpoint!

![breakpoints.jpg](https://cdn.prod.website-files.com/65b8cd72835ceeacd4449a53/66dbae7985b747dfed67775d_breakpoints1.png)

For clarity, LangGraph will re-emit the current state.

And then it will proceed to execute the following steps in the graph, which start with the `generate_response` node.

In [None]:
for chunk in graph.stream(None, config, stream_mode="values"):
    print(chunk)

Now, lets bring these together with a specific user approval step that accepts user input.

This is our human-in-the-loop!

In [None]:
config = {"configurable": {"thread_id": "4"}}
question = "Can I use LangGraph for building a customer support bot?"
for chunk in graph.stream({"question": question}, config, stream_mode="values"):
    print(chunk)

# Get user feedback
user_approval = input("Do these documents look good to you? (yes/no): ")

# Check approval
if user_approval.lower() == "yes":
    
    # If approved, continue the graph execution
    for chunk in graph.stream(None, config, stream_mode="values"):
        print(chunk)
else:
    print("Operation cancelled by user.")

Note that you can also specify `interrupt_before` to pause execution before a certain node executes.

__Let's take a look in LangSmith__

### Dynamic Breakpoints

Breakpoints are set by the developer on a specific node during graph compilation. 

But, sometimes it is helpful to allow the graph **dynamically interrupt** itself!

This is an internal breakpoint, and [can be achieved using `NodeInterrupt`](https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/dynamic_breakpoints/#run-the-graph-with-dynamic-interrupt).

This has a few specific benefits: 

(1) you can do it conditionally (from inside a node based on developer-defined logic).

(2) you can communicate to the user why its interrupted (by passing whatever you want to the `NodeInterrupt`).

Let's add a `NodeInterrupt` to our `grade_documents` node instead of always interrupting after it! Specifically, let's only interrupt if no relevant documents can be found, instead of directly ending execution.

In [27]:
from langgraph.errors import NodeInterrupt
def grade_documents(state):
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]
    conversation = get_buffer_string(state["messages"])

    # -- New logic to deduplicate documents our queries --
    doc_counter = Counter(doc.page_content for doc in documents)
    most_common_contents = doc_counter.most_common(5)
    top_documents = []
    for content, _ in most_common_contents:
        for d in documents:
            if d.page_content == content:
                top_documents.append(d)
                break

    # Score each one of our five most common documents
    filtered_docs = []
    for d in top_documents:
        grade_documents_prompt_formatted = grade_documents_prompt.format(document=d.page_content, question=question, conversation=conversation)
        score = grade_documents_llm.invoke(
            [SystemMessage(content=grade_documents_system_prompt)] + [HumanMessage(content=grade_documents_prompt_formatted)]
        )
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue

    # Note: Added interrupt here!
    if len(filtered_docs) == 0:
        raise NodeInterrupt(f"There are no relevant documents!: {filtered_docs}")
    
    return {"documents": {"type": "overwrite", "documents": filtered_docs}, "question": question, "messages": [HumanMessage(content=question)]}

Let's try it out!

In [None]:
# Define our graph
graph_builder = StateGraph(GraphState, input=InputState, output=OutputState)
graph_builder.add_node("generate_rewritten_queries", generate_rewritten_queries)
graph_builder.add_node("retrieve_documents", retrieve_documents)
graph_builder.add_node("generate_response", generate_response)
graph_builder.add_node("grade_documents", grade_documents)
graph_builder.add_node("configure_memory", configure_memory)

graph_builder.add_edge(START, "generate_rewritten_queries")
graph_builder.add_conditional_edges(
    "generate_rewritten_queries",
    continue_to_retrieval_nodes,
    ["retrieve_documents"]
)
graph_builder.add_edge("retrieve_documents", "grade_documents")
graph_builder.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "some relevant": "generate_response",
        "none relevant": END
    })
graph_builder.add_conditional_edges(
    "generate_response",
    grade_hallucinations,
    {
        "supported": "configure_memory",
        "not supported": "generate_response"
    })
graph_builder.add_edge("configure_memory", END)

memory = MemorySaver()

graph = graph_builder.compile(checkpointer=memory)
display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
config = {"configurable": {"thread_id": "31"}}
question = "Who is Man City's coach?"
graph.invoke({"question": question}, config)

In [None]:
state = graph.get_state(config)
print(state.next)

In [None]:
graph.invoke(None, config)

### Editing State

We're stuck here! As we are running the same node, we can't progress. To complete this toy example, we can edit the state here with `update_state`, and then continue. Let's add a document about Man City's coach.

Note: These update use the same reducers that you defined in your State class.

In [None]:
graph.update_state(
    config,
    {"documents": {"type": "overwrite", "documents": [Document(page_content="Man City's coach is Pep Guardiola!")]}},
    as_node="grade_documents"
)

In [None]:
graph.invoke(None, config)

### Awaiting User Input as a Node

So, it's clear that we can edit our agent state after a breakpoint.

Now, what if we want to allow for human feedback to perform this state update?

We'll add a node that [serves as a placeholder for human feedback](https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/wait-user-input/#setup) within our agent.

This `human_feedback` node allow the user to add feedback directly to state.
 
We specify the breakpoint using `interrupt_before` our `human_feedback` node.

We set up a checkpointer to save the state of the graph up until this node.

In [34]:
# no-op node that should be interrupted on
def human_feedback(state: GraphState):
    pass

Let's get rid of our NodeInterrupt in `grade_documents`

In [35]:
def grade_documents(state):
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]
    conversation = get_buffer_string(state.get("messages", []))

    # -- New logic to deduplicate documents our queries --
    doc_counter = Counter(doc.page_content for doc in documents)
    most_common_contents = doc_counter.most_common(5)
    top_documents = []
    for content, _ in most_common_contents:
        for d in documents:
            if d.page_content == content:
                top_documents.append(d)
                break

    # Score each one of our five most common documents
    filtered_docs = []
    for d in top_documents:
        grade_documents_prompt_formatted = grade_documents_prompt.format(document=d.page_content, question=question, conversation=conversation)
        score = grade_documents_llm.invoke(
            [SystemMessage(content=grade_documents_system_prompt)] + [HumanMessage(content=grade_documents_prompt_formatted)]
        )
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue

    return {"documents": {"type": "overwrite", "documents": filtered_docs}, "question": question, "messages": [HumanMessage(content=question)]}

In [None]:
# Define our graph
graph_builder = StateGraph(GraphState, input=InputState, output=OutputState)
graph_builder.add_node("generate_rewritten_queries", generate_rewritten_queries)
graph_builder.add_node("retrieve_documents", retrieve_documents)
graph_builder.add_node("generate_response", generate_response)
graph_builder.add_node("grade_documents", grade_documents)
graph_builder.add_node("configure_memory", configure_memory)
graph_builder.add_node("human_feedback", human_feedback)

graph_builder.add_edge(START, "generate_rewritten_queries")
graph_builder.add_conditional_edges(
    "generate_rewritten_queries",
    continue_to_retrieval_nodes,
    ["retrieve_documents"]
)
graph_builder.add_edge("retrieve_documents", "grade_documents")
graph_builder.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "some relevant": "generate_response",
        "none relevant": "human_feedback"
    })
graph_builder.add_edge("human_feedback", "generate_response")
graph_builder.add_conditional_edges(
    "generate_response",
    grade_hallucinations,
    {
        "supported": "configure_memory",
        "not supported": "generate_response"
    })
graph_builder.add_edge("configure_memory", END)

memory = MemorySaver()

graph = graph_builder.compile(checkpointer=memory, interrupt_before=["human_feedback"])
display(Image(graph.get_graph().draw_mermaid_png()))

Now we stop execution on our Node, and continue once we receive human feedback!

In [None]:
config = {"configurable": {"thread_id": "32"}}
question = "Who is Man City's coach?"
graph.invoke({"question": question}, config)

user_input = input("Add a document to state: ")

graph.update_state(
    config,
    {"documents": {"type": "overwrite", "documents": [Document(page_content=user_input)]}},
    as_node="grade_documents"
)
graph.invoke(None, config)


__Let's take a look in LangSmith__

__Let's take a look in LangGraph Studio__