# OSS (LangChain) 

## Install and helper methods

In [None]:
%pip install -q langchain-core
%pip install -q langchain-openai
%pip install -q langchain-experimental
%pip install --pre -U -q langchain

### Helper methods

In [136]:
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, BaseMessage
from langchain_ollama import ChatOllama

local_model_id = "qwen2.5-coder:7b-instruct-q8_0"

# Pretty print
def pretty_print(response:dict):
    """Prints the content of each message in the response dictionary."""
    messages = response["messages"]
    for message in messages:
        print(message.content)

def print_dictionary(response:dict):
    # This method prints each message's content, role, and its metadata, with indentation for readability
    messages = response.get("messages", [])
    for idx, message in enumerate(messages):
        role = getattr(message, "role", message.__class__.__name__.replace("Message", "").lower())
        print(f"Message {idx+1}:")
        print(f"    Role: {role}")
        print(f"    Content: {message.content}\n")
        # Print additional metadata if present
        if hasattr(message, "response_metadata") and message.response_metadata:
            print(f"    Metadata: {message.response_metadata}\n")
        if hasattr(message, "additional_kwargs") and message.additional_kwargs:
            print(f"    Additional kwargs: {message.additional_kwargs}\n")
        if hasattr(message, "usage_metadata") and getattr(message, "usage_metadata", None):
            print(f"    Usage metadata: {message.usage_metadata}\n")


def create_initial_message(text="Hi!"):
    """Creates a message instance with the given text."""
    
    return {"messages": [format_message(text)]}


def format_message(text="Hi! My name is Bob.", MessageClass: type = HumanMessage):
    """Creates a single message instance."""
    return MessageClass(content=text)


def create_model(model_id = "gpt-4.1-nano", local = False):
    # uses openAPI_key 
    if local:
        model = ChatOllama(
            model=model_id,
            temperature=0,
            )
    else:
        model = ChatOpenAI(
            model=model_id,
            timeout=30
            )
    
    return model

### Create sequential thread_id for checkpoints

In [101]:
# %pip install psycopg2-binary
import psycopg2
from psycopg2 import sql

DB_URI = "postgresql://thomas:postgres@localhost:5432/postgres?sslmode=disable"

def get_next_thread_id():
    """
    Ensures the sequence exists and returns the next thread_id.
    """
    with psycopg2.connect(DB_URI) as conn:
        with conn.cursor() as cur:
            # Create the sequence if it doesn't exist
            cur.execute("""
                DO $$
                BEGIN
                    IF NOT EXISTS (SELECT 1 FROM pg_class WHERE relkind='S' AND relname='thread_id_seq') THEN
                        CREATE SEQUENCE thread_id_seq START 1;
                    END IF;
                END
                $$;
            """)

            # Get the next value from the sequence
            cur.execute("SELECT nextval('thread_id_seq')")
            new_thread_id = cur.fetchone()[0]

    return new_thread_id



## Simple processor

In [82]:
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
  os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain.chat_models import init_chat_model

model = init_chat_model("gpt-5-nano", model_provider="openai")

query = input()
if query:
  result = model.invoke(query)
  print(result)

## Simple Agent 

#### Basic agent

In [None]:
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI

model = create_model()

tools = [calculate, ]
agent = create_agent(
    model, 
    tools=tools,
    prompt=SystemMessage(content="You are a research assistant. Cite your sources.")
)


# query = create_initial_message(input())
if query:
    # result = agent.invoke(query)
    print_dictionary(result)

Message 1:
    Role: human
    Content: What is 2+2

Message 2:
    Role: ai
    Content: 2 + 2 equals 4.

    Metadata: {'token_usage': {'completion_tokens': 8, 'prompt_tokens': 27, 'total_tokens': 35, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4.1-nano-2025-04-14', 'system_fingerprint': 'fp_7c233bf9d1', 'id': 'chatcmpl-CK1xCOevXfNYa23yEAwOSTsT1qGtq', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}

    Additional kwargs: {'refusal': None}

    Usage metadata: {'input_tokens': 27, 'output_tokens': 8, 'total_tokens': 35, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}}



#### Choose model dynamicly

In [109]:
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent, AgentState
from langgraph.runtime import Runtime


def select_model(state: AgentState, runtime: Runtime) -> ChatOpenAI:
    """Choose model based on conversation complexity."""
    messages = state["messages"]
    message_count = len(messages)
    if message_count < 3:
        return ChatOpenAI(model="gpt-4.1-nano").bind_tools(tools)
    else:
        return ChatOpenAI(model="gpt-5-nano").bind_tools(tools) # select a better model for longer conversations

agent = create_agent(select_model, tools=[])

# Suppose this is the current agent state
state = AgentState(messages=[format_message("Hi", MessageClass=HumanMessage, first=False), format_message("Hi how may I assist you?", AIMessage, first=false) ])

# Get the model instance that would be used for this state
model_instance = select_model(state, runtime=None)
print(model_instance.model_name)

state = AgentState(messages=[
    format_message("Hello"),
    format_message("Hi how may I assist you?", AIMessage), 
    format_message("Lorem ipsum")]
    )

model_instance = select_model(state, runtime=None)
print(model_instance.model_name) 


TypeError: format_message() got an unexpected keyword argument 'first'

#### Use local model

In [None]:
# %pip install -qU langchain-ollama
from langchain_ollama import ChatOllama
local_model_id = "qwen2.5-coder:7b-instruct-q8_0"

llm = ChatOllama(
    model=local_model_id,
    temperature=0,
)

messages = [
    (
        "system",
        "You are a helpful assistant that translates English to French. Translate the user sentence.",
    ),
    ("human", "I love programming."),
]
# ai_msg = llm.invoke(messages)
# # pretty_print(ai_msg)
# print(ai_msg)

content="J'aime programmer." additional_kwargs={} response_metadata={'model': 'qwen2.5-coder:7b-instruct-q8_0', 'created_at': '2025-09-29T05:55:21.770835Z', 'done': True, 'done_reason': 'stop', 'total_duration': 10860239333, 'load_duration': 5122468292, 'prompt_eval_count': 33, 'prompt_eval_duration': 5244246084, 'eval_count': 6, 'eval_duration': 463770666, 'model_name': 'qwen2.5-coder:7b-instruct-q8_0'} id='run--eac17565-5cad-4167-b560-7beb7d497601-0' usage_metadata={'input_tokens': 33, 'output_tokens': 6, 'total_tokens': 39}


#### Chaining

In [None]:
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful assistant that translates {input_language} to {output_language}.",
        ),
        ("human", "{input}"),
    ]
)

chain = prompt | llm
chain.invoke(
    {
        "input_language": "English",
        "output_language": "German",
        "input": "I love programming.",
    }
)



AIMessage(content='Ich liebe Programmieren.', additional_kwargs={}, response_metadata={'model': 'qwen2.5-coder:7b-instruct-q8_0', 'created_at': '2025-09-29T05:44:24.696387Z', 'done': True, 'done_reason': 'stop', 'total_duration': 1417916167, 'load_duration': 62493333, 'prompt_eval_count': 28, 'prompt_eval_duration': 892272084, 'eval_count': 6, 'eval_duration': 455109541, 'model_name': 'qwen2.5-coder:7b-instruct-q8_0'}, id='run--12083a26-163f-4555-bf82-bdb1fc02726d-0', usage_metadata={'input_tokens': 28, 'output_tokens': 6, 'total_tokens': 34})

#### Streaming the model invokes - for long queries

In [None]:
agent = create_agent(create_model(),[])

for chunk in agent.stream({
    "messages": [{"role": "user", "content": "Search for AI news and summarize the findings"}]
}, stream_mode="values"):
    # Each chunk contains the full state at that point
    latest_message = chunk["messages"][-1]
    if latest_message.content:
        print(f"Agent: {latest_message.content}")
    elif latest_message.tool_calls:
        print(f"Calling tools: {[tc['name'] for tc in latest_message.tool_calls]}")

#### Set prompt Dynamicly

In [None]:
def dynamic_prompt(state):
    user_type = state.get("user_type", "standard")
    system_msg = SystemMessage(
        content="Provide detailed technical responses." if user_type == "expert" else "Provide simple, clear explanations."
    )
    return [system_msg] + state["messages"]
agent = create_agent(model, tools, prompt=dynamic_prompt)
q

#### Multi-turn conversations

In [None]:
from langchain_core.messages import filter_messages

def manage_conversation_window(messages, max_messages=10):
    """Keep only the system message and last N messages"""
    system_msgs = filter_messages(messages, include_types="system")
    recent_msgs = messages[-(max_messages-len(system_msgs)):]
    return system_msgs + recent_msgs

messages = [SystemMessage(content="Your a helpful assistant")]

# Usage in conversation loop
while True:
    user_input = input("You: ")
    if user_input.lower() == "quit":
        break

    messages.append(HumanMessage(content=user_input))

    # Trim conversation to fit context window
    messages = manage_conversation_window(messages)

    response = model.invoke(messages)
    messages.append(response)

## Tools

### Create basic tools

In [155]:
from langchain_core.tools import tool

@tool
def search(query: str) -> str:
    """Search for information."""
    return f"Results for: {query}"

@tool
def calculate(expression: str) -> str:
    """Perform calculations."""
    return str(5)
    return str(eval(expression))


@tool
def search_database(query: str, limit: int = 10) -> str:
    """Search the customer database for records matching the query.

    Args:
        query: Search terms to look for
        limit: Maximum number of results to return
    """
    return f"Found {limit} results for '{query}'"


### Create structured output

#### Simple output format 

In [None]:
from pydantic import BaseModel, Field
from typing import Literal, Optional
from langchain.agents import create_agent
from langchain.agents.structured_output import ToolStrategy

from typing import Union

class ProductReview(BaseModel):
    """Analysis of a product review."""
    rating: Optional[int] = Field(description="The rating of the product", ge=1, le=5)
    sentiment: Literal["positive", "negative"] = Field(description="The sentiment of the review")
    key_points: list[str] = Field(description="The key points of the review. Lowercase, 1-3 words each.")

model = create_model(local_model_id, True)
agent = create_agent(
    model=model,
    tools=tools,
    response_format=ProductReview

)

result = agent.invoke({
    "messages": [{"role": "user", "content": "Analyze this review: 'Great product: 5 out of 5 stars. Fast shipping, but expensive'"}]
})
# result["structured_response"]
result["messages"][1]
result
# ProductReview(rating=5, sentiment='positive', key_points=['fast shipping', 'expensive'])

{'messages': [HumanMessage(content="Analyze this review: 'Great product: 5 out of 5 stars. Fast shipping, but expensive'", additional_kwargs={}, response_metadata={}, id='d37a43b1-cd82-4cc5-a1fb-5c7ec3aa05c0'),
  AIMessage(content='{\n  "name": "ProductReview",\n  "arguments": {\n    "rating": 5,\n    "sentiment": "positive",\n    "key_points": [\n      "great product",\n      "fast shipping",\n      "expensive"\n    ]\n  }\n}', additional_kwargs={}, response_metadata={'model': 'qwen2.5-coder:7b-instruct-q8_0', 'created_at': '2025-09-29T06:20:48.203524Z', 'done': True, 'done_reason': 'stop', 'total_duration': 7089659708, 'load_duration': 63446958, 'prompt_eval_count': 537, 'prompt_eval_duration': 1636961500, 'eval_count': 56, 'eval_duration': 5353350166, 'model_name': 'qwen2.5-coder:7b-instruct-q8_0'}, id='run--ac7fd276-b2d7-4c73-936a-684fbbf3e946-0', usage_metadata={'input_tokens': 537, 'output_tokens': 56, 'total_tokens': 593})]}

#### Add custom message tool message

In [None]:
from pydantic import BaseModel, Field
from typing import Literal
from langchain.agents import create_agent
from langchain.agents.structured_output import ToolStrategy

class MeetingAction(BaseModel):
    """Action items extracted from a meeting transcript."""
    task: str = Field(description="The specific task to be completed")
    assignee: str = Field(description="Person responsible for the task")
    priority: Literal["low", "medium", "high"] = Field(description="Priority level")
model = create_model(local_model_id, True)
agent = create_agent(
    model=model,
    tools=[],
    response_format=ToolStrategy(
        schema=MeetingAction,
        tool_message_content="Action item captured and added to meeting notes!"
    )
)

result = agent.invoke({
    "messages": [{"role": "user", "content": "From our meeting: Sarah needs to update the project timeline as soon as possible"}]
})

result["messages"][1].content

AIMessage(content='{\n  "name": "MeetingAction",\n  "arguments": {\n    "task": "update the project timeline",\n    "assignee": "Sarah",\n    "priority": "high"\n  }\n}', additional_kwargs={}, response_metadata={'model': 'qwen2.5-coder:7b-instruct-q8_0', 'created_at': '2025-09-29T06:03:18.942758Z', 'done': True, 'done_reason': 'stop', 'total_duration': 5015568167, 'load_duration': 59698667, 'prompt_eval_count': 225, 'prompt_eval_duration': 1138130416, 'eval_count': 43, 'eval_duration': 3811779625, 'model_name': 'qwen2.5-coder:7b-instruct-q8_0'}, id='run--ad5599d5-6232-462b-818d-dcd975cf4dc5-0', usage_metadata={'input_tokens': 225, 'output_tokens': 43, 'total_tokens': 268})

#### Structured Output Error Handling

##### Schema validations errors

In [None]:
from pydantic import BaseModel, Field
from typing import Optional
from langchain.agents import create_agent
from langchain.agents.structured_output import ToolStrategy

class ProductRating(BaseModel):
    rating: Optional[int] = Field(description="Rating from 1-5", ge=1, le=5)
    comment: str = Field(description="Review comment")
    # uml: list[str] = Field

model = create_model(local_model_id,True)
agent = create_agent(
    model=model,
    tools=[],
    response_format=ToolStrategy(ProductRating),  # Default: handle_errors=True
    prompt="You are a helpful assistant that parses product reviews. Do not make any field or value up."
)

result = agent.invoke({
    "messages": [{"role": "user", "content": "Parse this: Amazing product, 10/10!"}]
})
result["messages"][1].content

'{"name": "ProductRating", "arguments": {"rating": 5, "comment": "Amazing product, 10/10!"}}'

##### Define errors to retry the model

In [144]:
ToolStrategy(
    schema=ProductRating,
    handle_errors=(ValueError, TypeError)  # Retry on ValueError and TypeError otherwise raise the exception
)

ToolStrategy(schema=<class '__main__.ProductRating'>, schema_specs=[_SchemaSpec(schema=<class '__main__.ProductRating'>, name='ProductRating', description='', schema_kind='pydantic', json_schema={'properties': {'rating': {'anyOf': [{'maximum': 5, 'minimum': 1, 'type': 'integer'}, {'type': 'null'}], 'description': 'Rating from 1-5', 'title': 'Rating'}, 'comment': {'description': 'Review comment', 'title': 'Comment', 'type': 'string'}}, 'required': ['rating', 'comment'], 'title': 'ProductRating', 'type': 'object'}, strict=False)], tool_message_content=None, handle_errors=(<class 'ValueError'>, <class 'TypeError'>))

##### Custom error handler

In [148]:
from typing import Union
from langchain.agents.structured_output import MultipleStructuredOutputsError, StructuredOutputValidationError 

class EventDetails(BaseModel):
    location: str = Field(description="Location of the event")
    description: str = Field(description="Description of the event")


def custom_error_handler(error: Exception) -> str:
    if isinstance(error, StructuredOutputValidationError):
        return "There was an issue with the format. Try again."
    elif isinstance(error, MultipleStructuredOutputsError):
        return "Multiple structured outputs were returned. Pick the most relevant one."
    else:
        return f"Error: {str(error)}"

ToolStrategy(
    schema=ToolStrategy(Union[ProductReview, EventDetails]),
    handle_errors=custom_error_handler
)

AttributeError: 'ToolStrategy' object has no attribute '__mro__'

### ToolNode with error handling

In [159]:
from langchain.agents import ToolNode

tool_node = ToolNode(
    tools=[search, calculate],
    handle_tool_errors="Please check your input and try again."
)
agent = create_agent(model, tools=tool_node)

query = create_initial_message(input())
if query:
    result = agent.invoke(query)

    print(result["messages"][1].content)
    print(result["messages"][1].tool_calls)

{
  "name": "search",
  "arguments": {
    "query": "rime"
  }
}
[]


## Setup DB, RAG and vectorstore

### RAG from website

In [None]:
%pip install -q markdownify
import requests
from langchain.chat_models import init_chat_model
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
from markdownify import markdownify

ALLOWED_DOMAINS = ["https://langchain-ai.github.io/"]
LLMS_TXT = 'https://langchain-ai.github.io/langgraph/llms.txt'


@tool
def fetch_documentation(url: str) -> str:  
    """Fetch and convert documentation from a URL"""
    if not any(url.startswith(domain) for domain in ALLOWED_DOMAINS):
        return (
            "Error: URL not allowed. "
            f"Must start with one of: {', '.join(ALLOWED_DOMAINS)}"
        )
    response = requests.get(url, timeout=10.0)
    response.raise_for_status()
    return markdownify(response.text)


# We will fetch the content of llms.txt, so this can
# be done ahead of time without requiring an LLM request.
llms_txt_content = requests.get(LLMS_TXT).text

# System prompt for the agent
system_prompt = f"""
You are an expert Python developer and technical assistant.
Your primary role is to help users with questions about LangGraph and related tools.

Instructions:

1. If a user asks a question you're unsure about — or one that likely involves API usage,
   behavior, or configuration — you MUST use the `fetch_documentation` tool to consult the relevant docs.
2. When citing documentation, summarize clearly and include relevant context from the content.
3. Do not use any URLs outside of the allowed domain.
4. If a documentation fetch fails, tell the user and proceed with your best expert understanding.

You can access official documentation from the following approved sources:

{llms_txt_content}

You MUST consult the documentation to get up to date documentation
before answering a user's question about LangGraph.

Your answers should be clear, concise, and technically accurate.
"""

tools = [fetch_documentation]

model = init_chat_model("claude-sonnet-4-0", max_tokens=32_000)

agent = create_react_agent(
    model=model,
    tools=tools,  
    prompt=system_prompt,  
    name="Agentic RAG",
)

response = agent.invoke({
    'messages': [{
        'role': 'user',
        'content': (
            "Write a short example of a langgraph agent using the "
            "prebuilt create react agent. the agent should be able "
            "to look up stock pricing information."
        )
    }]
})

print(response['messages'][-1].content)

### Setup chromaDB for local RAG

In [None]:
%pip install -q chromadb

import chromadb
chroma_client = chromadb.Client()

# switch `create_collection` to `get_or_create_collection` to avoid creating a new collection every time
collection = chroma_client.get_or_create_collection(name="my_collection")

# switch `add` to `upsert` to avoid adding the same documents every time
collection.upsert(
    documents=[
        "This is a document about pineapple",
        "This is a document about oranges"
    ],
    ids=["id1", "id2"]
)

results = collection.query(
    query_texts=["This is a query document about florida"], # Chroma will embed this for you
    n_results=2 # how many results to return
)

print(results)

### RAG - Local source

In [None]:
%pip install langchain-chroma



from langchain_community.document_loaders import TextLoader
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import CharacterTextSplitter

# Load the document, split it into chunks, embed each chunk and load it into the vector store.
raw_documents = TextLoader('state_of_the_union.txt').load()

query = input()

response = model.invoke(query)
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = text_splitter.split_documents(raw_documents)

from langchain_chroma import Chroma
chroma_client = chromadb.Client()

collection_name = "user"
chroma_client.get_or_create_collection(name=collection_name)
chroma_client.a

db = Chroma.from_documents(documents, OpenAIEmbeddings())

query = "What did the president say about Ketanji Brown Jackson"
docs = db.similarity_search(query)
print(docs[0].page_content)

### Create In memory vector store

In [None]:
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
  os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

from langchain_core.vectorstores import InMemoryVectorStore

vector_store = InMemoryVectorStore(embeddings)
vector_store.add_texts("This is a cool text")
vector_store.add_documents()


## Human in the loop

### Setup middleware and tool configuration

In [105]:
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver

def write_file_tool():
    return "succes"

def execute_sql_tool():
    return "succes"

def read_data_tool():
    return "succes"


tool_cfg = tool_configs={
                "write_file": {
                    "allow_accept": True,
                    "allow_edit": True,
                    "allow_respond": True,
                    "description": "⚠️ File write operation requires approval",
                },
                "execute_sql": {
                    "allow_accept": True,
                    "description": "🚨 SQL execution requires careful review",
                },
                "read_data": False,  # Safe operation, no approval needed
            }

hitl = HumanInTheLoopMiddleware(tool_cfg, description_prefix="Tool execution pending approval")

agent = create_agent(
    model=create_model(),
    tools=[write_file_tool, execute_sql_tool, read_data_tool],
    middleware=[hitl],
    checkpointer=InMemorySaver(),  # Required for interrupts
)

ValueError: Function must have a docstring if description not provided.

#### Test hitl middleware

In [106]:
from langchain_core.messages import HumanMessage
from langgraph.types import Command
from langchain_core.runnables import RunnableConfig

# Initial invocation
config = RunnableConfig({"run_name": "Human_in_the_loop", "thread_id": "1"})
result = agent.invoke(
    {
        "messages": [HumanMessage("Delete old records from the database")],
    },
    config
)

# Check if paused for approval
state = agent.get_state(config)
if state.next:
    request = state["__interrupt__"].value[0]["action_request"]

    # Display tool details to human
    print("Tool:", request["action"])
    print("Arguments:", request["args"])

    # Resume with approval decision
    agent.invoke(
        Command(
            resume=[{"type": "accept"}]  # or "edit", "ignore", "response"
        ),
        config=config
    )

OperationalError: the connection is closed

#### Human-in-the-loop wrapping tool

In [None]:
from typing import Callable
from langchain_core.tools import BaseTool, tool as create_tool
from langchain_core.runnables import RunnableConfig
from langgraph.types import interrupt
from langgraph.prebuilt.interrupt import HumanInterruptConfig, HumanInterrupt

def add_human_in_the_loop(
    tool: Callable | BaseTool,
    *,
    interrupt_config: HumanInterruptConfig = None,
) -> BaseTool:
    """Wrap a tool to support human-in-the-loop review."""
    if not isinstance(tool, BaseTool):
        tool = create_tool(tool)

    if interrupt_config is None:
        interrupt_config = {
            "allow_accept": True,
            "allow_edit": True,
            "allow_respond": True,
        }

    @create_tool(  # (1)!
        tool.name,
        description=tool.description,
        args_schema=tool.args_schema
    )
    def call_tool_with_interrupt(config: RunnableConfig, **tool_input):
        request: HumanInterrupt = {
            "action_request": {
                "action": tool.name,
                "args": tool_input
            },
            "config": interrupt_config,
            "description": "Please review the tool call"
        }
        response = interrupt([request])[0]  # (2)!
        # approve the tool call
        if response["type"] == "accept":
            tool_response = tool.invoke(tool_input, config)
        # update tool call args
        elif response["type"] == "edit":
            tool_input = response["args"]["args"]
            tool_response = tool.invoke(tool_input, config)
        # respond to the LLM with user feedback
        elif response["type"] == "response":
            user_feedback = response["args"]
            tool_response = user_feedback
        else:
            raise ValueError(f"Unsupported interrupt response type: {response['type']}")

        return tool_response

    return call_tool_with_interrupt

## Display LangGraph flow 

Also checkout: https://docs.langchain.com/oss/python/langchain/ui

In [None]:
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

## Agent Memory

### Short Term memory

In [40]:

from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.runnables import RunnableConfig


checkpointer = InMemorySaver() # Local
config: RunnableConfig = {"configurable": {"thread_id": "1"}}

agent = create_agent(
    model=select_model,
    tools=[],
    checkpointer=checkpointer,
)
from langchain_core.messages import HumanMessage

messages = {"messages": [HumanMessage(content="Hi! My name is Bob.")]}
agent.invoke(
    messages,
    config,
)
agent.invoke(
    messages,
    config,
)

check_point = checkpointer.get({"configurable": {"thread_id": "1"}})

print(check_point)
print(agent.get_state(config))




{'v': 4, 'ts': '2025-09-26T09:41:53.691544+00:00', 'id': '1f09abd0-8e08-6c90-8004-90cab8933058', 'channel_versions': {'__start__': '00000000000000000000000000000005.0.7601503170391442', 'messages': '00000000000000000000000000000006.0.07850529529450567', 'branch:to:agent': '00000000000000000000000000000006.0.07850529529450567'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000004.0.3017933414812394'}, 'agent': {'branch:to:agent': '00000000000000000000000000000005.0.7601503170391442'}}, 'updated_channels': ['messages'], 'channel_values': {'messages': [HumanMessage(content='Hi! My name is Bob.', additional_kwargs={}, response_metadata={}, id='601dc69b-16c9-4478-97e9-0b1b0ef20b52'), AIMessage(content='Hello Bob! Nice to meet you. How can I assist you today?', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 30, 'total_tokens': 46, 'completion_tokens_details': {'accepted_prediction_t

In [96]:
%pip install -q langgraph-checkpoint-postgres
%pip install -q "psycopg[binary,pool]"
from langchain.agents import create_agent
from langgraph.checkpoint.postgres import PostgresSaver
# python -m langgraph.checkpoint.postgres.migrate --url postgresql://postgres:postgres@localhost:5432/postgres


DB_URI = "postgresql://thomas:postgres@localhost:5432/postgres?sslmode=disable"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    checkpointer.setup()
    model = create_model()
    agent = create_agent(
        model,
        [],
        checkpointer=checkpointer,
    )

    thread_id = get_next_thread_id()
    agent.invoke(
        {"messages": [{"role": "user", "content": "Hi! My name is Bob."}]},
        {"configurable": {"thread_id": thread_id}}
    )

    check_point = checkpointer.get({"configurable": {"thread_id": thread_id}})
    print_dictionary(check_point)


Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


UndefinedFunction: operator does not exist: text = smallint
LINE 26: from checkpoints WHERE thread_id = $1 AND checkpoint_ns = $2...
                                          ^
HINT:  No operator matches the given name and argument types. You might need to add explicit type casts.

In [111]:
%pip install -q langmem
from langmem.short_term import SummarizationNode, RunningSummary
from langchain_core.messages.utils import count_tokens_approximately
from langchain.agents import create_agent, AgentState
from langgraph.checkpoint.memory import InMemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableConfig


summarization_node = SummarizationNode(
    token_counter=count_tokens_approximately,
    model=model,
    max_tokens=384,
    max_summary_tokens=128,
    output_messages_key="llm_input_messages",
)

class State(AgentState):
    # Added for the SummarizationNode to be able to keep track of the running summary information
    context: dict[str, RunningSummary]


checkpointer = InMemorySaver()

agent = create_agent(
    model=model,
    tools=[],
    pre_model_hook=summarization_node,
    state_schema=State,
    checkpointer=checkpointer,
)

config: RunnableConfig = {"configurable": {"thread_id": "1"}}

agent.invoke({"messages": "hi, my name is bob"}, config)
agent.invoke({"messages": "write a short poem about cats"}, config)
agent.invoke({"messages": "now do the same but for dogs"}, config)
final_response = agent.invoke({"messages": "what's my name?"}, config)

print(final_response.keys())

final_response["messages"][-1].pretty_print()
print("\nSummary:", final_response["context"]["running_summary"].summary)

Note: you may need to restart the kernel to use updated packages.
dict_keys(['messages'])

Your name is Bob. How can I assist you further?


KeyError: 'context'

#### Long term Memory

In [None]:
from langchain_core.runnables import RunnableConfig
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore
from langgraph.store.base import BaseStore

model = init_chat_model(model="anthropic:claude-3-5-haiku-latest")

DB_URI = "postgresql://postgres:postgres@localhost:5442/postgres?sslmode=disable"

with (
    PostgresStore.from_conn_string(DB_URI) as store,
    PostgresSaver.from_conn_string(DB_URI) as checkpointer,
):
    # store.setup()
    # checkpointer.setup()

    def call_model(
        state: MessagesState,
        config: RunnableConfig,
        *,
        store: BaseStore,
    ):
        user_id = config["configurable"]["user_id"]
        namespace = ("memories", user_id)
        memories = store.search(namespace, query=str(state["messages"][-1].content))
        info = "\n".join([d.value["data"] for d in memories])
        system_msg = f"You are a helpful assistant talking to the user. User info: {info}"

        # Store new memories if the user asks the model to remember
        last_message = state["messages"][-1]
        if "remember" in last_message.content.lower():
            memory = "User name is Bob"
            store.put(namespace, str(uuid.uuid4()), {"data": memory})

        response = model.invoke(
            [{"role": "system", "content": system_msg}] + state["messages"]
        )
        return {"messages": response}

    builder = StateGraph(MessagesState)
    builder.add_node(call_model)
    builder.add_edge(START, "call_model")

    graph = builder.compile(
        checkpointer=checkpointer,
        store=store,
    )

    config = {
        "configurable": {
            "thread_id": "1",
            "user_id": "1",
        }
    }
    for chunk in graph.stream(
        {"messages": [{"role": "user", "content": "Hi! Remember: my name is Bob"}]},
        config,
        stream_mode="values",
    ):
        chunk["messages"][-1].pretty_print()

    config = {
        "configurable": {
            "thread_id": "2",
            "user_id": "1",
        }
    }

    for chunk in graph.stream(
        {"messages": [{"role": "user", "content": "what is my name?"}]},
        config,
        stream_mode="values",
    ):
        chunk["messages"][-1].pretty_print()

#### Long term memory with semantic search

In [None]:
from typing import Optional

from langchain.embeddings import init_embeddings
from langchain.chat_models import init_chat_model
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
from langgraph.graph import START, MessagesState, StateGraph

llm = init_chat_model("openai:gpt-4o-mini")

# Create store with semantic search enabled
embeddings = init_embeddings("openai:text-embedding-3-small")
store = InMemoryStore(
    index={
        "embed": embeddings,
        "dims": 1536,
    }
)

store.put(("user_123", "memories"), "1", {"text": "I love pizza"})
store.put(("user_123", "memories"), "2", {"text": "I am a plumber"})

def chat(state, *, store: BaseStore):
    # Search based on user's last message
    items = store.search(
        ("user_123", "memories"), query=state["messages"][-1].content, limit=2
    )
    memories = "\n".join(item.value["text"] for item in items)
    memories = f"## Memories of user\n{memories}" if memories else ""
    response = llm.invoke(
        [
            {"role": "system", "content": f"You are a helpful assistant.\n{memories}"},
            *state["messages"],
        ]
    )
    return {"messages": [response]}


builder = StateGraph(MessagesState)
builder.add_node(chat)
builder.add_edge(START, "chat")
graph = builder.compile(store=store)

for message, metadata in graph.stream(
    input={"messages": [{"role": "user", "content": "I'm hungry"}]},
    stream_mode="messages",
):
    print(message.content, end="")

## Multi-agent

### Orchestrator

In [114]:
from langchain.tools import tool
from langchain.agents import create_agent

subagent1 = create_agent(..)

@tool(
    name="subagent1_name",
    description="subagent1_description"
)
def call_subagent1(query: str):
    result = subagent1.invoke({
        "messages": [{"role": "user", "content": query}]
    })
    return result["messages"].text

agent = create_agent(..., tools=[call_subagent1])

SyntaxError: invalid syntax (3163895073.py, line 4)

#### Validate input

In [116]:
from typing import Annotated
from langchain.agents import AgentState
from langchain.agents.tool_node import InjectedState

@tool(
    name="subagent1_name",
    description="subagent1_description"
)
def call_subagent1(query: str, state: Annotated[CustomState, InjectedState]):
    # Apply any logic needed to transform the messages into a suitable input
    subagent_input = some_logic(query, state.messages)
    result = subagent1.invoke({
        "messages": subagent_input,
        # You could also pass other state keys here as needed.
        # Make sure to define these in both the main and subagent's
        # state schemas.
        "example_state_key": state.example_state_key
    })
    return result["messages"][-1].text


TypeError: tool() got an unexpected keyword argument 'name'

#### Format output

In [None]:
from typing import Annotated
from langchain.agents import AgentState
from langchain_core.tools import InjectedToolCallId
from langgraph.types import Command


@tool(
    name="subagent1_name",
    description="subagent1_description"
)
# We need to pass the `tool_call_id` to the sub agent so it can use it to respond with the tool call result
def call_subagent1(
    query: str,
    tool_call_id: Annotated[str, InjectedToolCallId],
# You need to return a `Command` object to include more than just a final tool call
) -> Command:
    result = subagent1.invoke({
        "messages": [{"role": "user", "content": query}]
    })
    return Command(update={
        # This is the example state key we are passing back
        "example_state_key": result["example_state_key"],
        "messages": [
            ToolMessage(
                # We need to include the tool call id so it matches up with the right tool call
                result["messages"][-1].text, tool_call_id=tool_call_id
            )
        ]
    })

## MCP

In [None]:
%pip install -q "langgraph-api>=0.2.3" "langgraph-sdk>=0.1.61"
%pip install -q langchain-mcp-adapters

In [None]:
# Create server parameters for stdio connection
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
import asyncio

from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent

server_params = {
    "url": "https://mcp-finance-agent.xxx.us.langgraph.app/mcp",
    "headers": {
        "X-Api-Key":"lsv2_pt_your_api_key"
    }
}

async def main():
    async with streamablehttp_client(**server_params) as (read, write, _):
        async with ClientSession(read, write) as session:
            # Initialize the connection
            await session.initialize()

            # Load the remote graph as if it was a tool
            tools = await load_mcp_tools(session)

            # Create and run a react agent with the tools
            agent = create_react_agent("openai:gpt-4.1", tools)

            # Invoke the agent with a message
            query = create_initial_message("What can the finance agent do for me?")
            agent_response = await agent.ainvoke(query)
            print(agent_response)
            print_dictionary(agent_response)

if __name__ == "__main__":
    asyncio.run(main())

### Local MCP server

In [92]:
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
from langchain_core.messages import SystemMessage
from langchain_openai import ChatOpenAI


path_math = "/Users/thomas/Desktop/Agentic-AI/LangChain/math_server.py"
client = MultiServerMCPClient(
    {
        "math": {
            "transport": "stdio",  # Local subprocess communication
            "command": "python",
            # Absolute path to your math_server.py file
            "args": [path_math],
        },
        "weather": {
            "transport": "streamable_http",  # HTTP-based remote server
            # Ensure you start your weather server on port 8000
            "url": "http://localhost:8000/mcp",
        }
    }
)

tools = await client.get_tools()

print(tools)
model = create_model()
agent = create_agent(
    model, 
    tools=tools,
    prompt=SystemMessage(content="You are a personal assistant. That can use tools")
)



math_request = create_initial_message("what's (3 + 5) x 12?")

math_response = await agent.ainvoke(math_request, thread_id="session_1")

weather_request = create_initial_message("what is the weather in nyc?")
weather_response = await agent.ainvoke(weather_request,thread_id="session_2")


print_dictionary(math_response)
print_dictionary(weather_response)



[StructuredTool(name='add', description='Add two numbers', args_schema={'type': 'object', 'properties': {'a': {'type': 'number', 'description': 'First number'}, 'b': {'type': 'number', 'description': 'Second number'}}, 'required': ['a', 'b']}, response_format='content_and_artifact', coroutine=<function convert_mcp_tool_to_langchain_tool.<locals>.call_tool at 0x1119c4180>), StructuredTool(name='subtract', description='Subtract two numbers', args_schema={'type': 'object', 'properties': {'a': {'type': 'number', 'description': 'First number'}, 'b': {'type': 'number', 'description': 'Second number'}}, 'required': ['a', 'b']}, response_format='content_and_artifact', coroutine=<function convert_mcp_tool_to_langchain_tool.<locals>.call_tool at 0x11209cf40>), StructuredTool(name='multiply', description='Multiply two numbers', args_schema={'type': 'object', 'properties': {'a': {'type': 'number', 'description': 'First number'}, 'b': {'type': 'number', 'description': 'Second number'}}, 'required':

### MCP toolNode

In [None]:
from langchain_mcp_adapters.client import MultiServerMCPClient

def mcp_tools_node(state, config):
    user = config["configurable"].get("langgraph_auth_user")
         , user["github_token"], user["email"], etc.

    client = MultiServerMCPClient({
        "github": {
            "transport": "streamable_http", # (1)
            "url": "https://my-github-mcp-server/mcp", # (2)
            "headers": {
                "Authorization": f"Bearer {user['github_token']}"
            }
        }
    })
    tools = await client.get_tools() # (3)

    # Your tool-calling logic here

    tool_messages = 
    return {"messages": tool_messages}

In [None]:
%pip install mcp

