Demo - limiting the number of tokens used

In [None]:
from typing import List, Annotated
from langgraph.graph import StateGraph, MessagesState, add_messages, START, END
from langchain_core.messages import (
    HumanMessage, 
    AIMessage, 
    SystemMessage,
    AnyMessage,
    RemoveMessage,
    trim_messages
)
from langchain_openai import ChatOpenAI
from IPython.display import Image, display

**Single Node Workflow**

In [None]:
from dotenv import load_dotenv
load_dotenv()
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.0,
)

In [None]:
def llm_node(state: MessagesState):
    return {"messages": llm.invoke(state["messages"])}

In [None]:
workflow = StateGraph(MessagesState)
workflow.add_node("llm_node", llm_node)
workflow.add_edge(START, "llm_node")
workflow.add_edge("llm_node", END)
graph = workflow.compile()

display(
    Image(
        graph.get_graph().draw_mermaid_png()
    )
)

**Useful Messages List**

In [None]:
messages = [
    SystemMessage(
        content="You're a FinTech specialist. You're not allowed to talk about anything else. Be concise in your answers.",
        name="System",
        id="0",
    ),
    HumanMessage(
        content="What is Pokemon?",
        name="User",
        id="1"
    ),
    AIMessage(
        content="I'm here to provide information specifically about FinTech. If you have " 
                "any questions related to financial technology, such as digital payments, " 
                "blockchain, cryptocurrencies, or financial services innovations, feel free " 
                "to ask!",
        name="FintechAssistant",
        id="2",
    ),
    HumanMessage(
        content="What is BlockChain?",
        name="User",
        id="3"
    ),
    AIMessage(
        content="Blockchain is a decentralized digital ledger technology that records" 
                "transactions across multiple computers in a way that ensures the security, " 
                "transparency, and integrity of the data. Each transaction is grouped into " 
                "a block, and these blocks are linked together in chronological order to form " 
                "a chain, hence the name blockchain.",
        name="FintechAssistant",
        id="4",
    ),
    HumanMessage(
        content="What is a credit card fraud?",
        name="User",
        id="5"
    ),
]

**Filter Messages during Invocation**

In [None]:
complete_output = graph.invoke({"messages": messages})

In [None]:
complete_output["messages"][-1].pretty_print()

In [None]:
complete_output["messages"][-1].response_metadata["token_usage"]

In [None]:
# this one with only the first and final message (initial setup prompt and last human prompt)
filtered_output = graph.invoke({"messages": [messages[0], messages[-1]]})

In [None]:
filtered_output["messages"][-1].pretty_print()

In [None]:
filtered_output["messages"][-1].response_metadata["token_usage"]

Filter Messages inside a node

In [None]:
class State(MessagesState):
    filtered_messages: Annotated[List[AnyMessage], add_messages]

In [None]:
def llm_node(state: State):
    filtered_messages = state["messages"][-3:]
    ai_message = llm.invoke(filtered_messages)
    filtered_messages.append(ai_message)
    return {"messages": ai_message, "filtered_messages": filtered_messages}

In [None]:
workflow = StateGraph(State)
workflow.add_node("llm_node", llm_node)
workflow.add_edge(START, "llm_node")
workflow.add_edge("llm_node", END)
graph = workflow.compile()

display(
    Image(
        graph.get_graph().draw_mermaid_png()
    )
)

In [None]:
for m in messages:
    m.pretty_print()

In [None]:
output = graph.invoke({'messages': messages})
for m in output['filtered_messages']:
    m.pretty_print()

**Remove Messages**

In [None]:
messages[:-3]

In [None]:
delete_messages = [RemoveMessage(id=m.id) for m in messages[:-3]]
add_messages(messages, delete_messages)

In [None]:
class State(MessagesState):
    filtered_messages: Annotated[List[AnyMessage], add_messages]

In [None]:
# remove messages from list (but not system messages)
def removal_filter(state: State):
    filtered_messages = [
        RemoveMessage(id=m.id) 
            for m in state["messages"][:-3] 
            if m.name != "System"
    ]
    return {
        "filtered_messages": add_messages(
            state["messages"], 
            filtered_messages
        )
    }

In [None]:
def llm_node(state: State):
    ai_message = llm.invoke(state["filtered_messages"])
    return {
        "filtered_messages": ai_message,
    }

In [None]:
workflow = StateGraph(State)
workflow.add_node("llm_node", llm_node)
workflow.add_node("removal_filter", removal_filter)
workflow.add_edge(START, "removal_filter")
workflow.add_edge("removal_filter", "llm_node")
workflow.add_edge("llm_node", END)

graph = workflow.compile()

display(
    Image(
        graph.get_graph().draw_mermaid_png()
    )
)

In [None]:
for m in messages:
    m.pretty_print()

In [None]:
output = graph.invoke({'messages': messages})
for m in output['filtered_messages']:
    m.pretty_print()

Trim Messages

In [None]:
# keeps last few tokens in the messages list (30 tokens here)
trim_messages(
    messages,
    max_tokens=30,
    strategy="last",
    token_counter=llm,
    allow_partial=False,
    include_system=True
)

In [None]:
class State(MessagesState):
    max_tokens: int
    filtered_messages: Annotated[List[AnyMessage], add_messages]

In [None]:
def trim_filter(state: State):
    max_tokens = state["max_tokens"]
    messages = state["messages"]
    filtered_messages = messages
    if max_tokens:
        filtered_messages = trim_messages(
            messages=messages,
            max_tokens=max_tokens,
            strategy="last",
            token_counter=llm,
            include_system=True,
            allow_partial=False
        )
    return {"filtered_messages": filtered_messages}

In [None]:
def llm_node(state: State):
    return {"filtered_messages": llm.invoke(state["filtered_messages"])}

In [None]:
workflow = StateGraph(State)
workflow.add_node("llm_node", llm_node)
workflow.add_node("trim_filter", trim_filter)
workflow.add_edge(START, "trim_filter")
workflow.add_edge("trim_filter", "llm_node")
workflow.add_edge("llm_node", END)

graph = workflow.compile()

display(
    Image(
        graph.get_graph().draw_mermaid_png()
    )
)

In [None]:
for m in messages:
    m.pretty_print()

In [None]:
output = graph.invoke(
    input={
        "max_tokens": 50,
        "messages": messages
    }
)

In [None]:
for m in output['filtered_messages']:
    m.pretty_print()

Summary

In [None]:
messages[1:-1]

In [None]:
# trim off the first and last message
messages_to_summarize = messages[1:-1]
summary_message = HumanMessage(
    content="Create a summary of the conversation above:", 
    name="User"
)
# get it summarised
ai_message = llm.invoke(
    add_messages(
        messages_to_summarize,
        summary_message
    )
)

In [None]:
ai_message.content

In [None]:
ai_message.id = "1"
messages[-1].id = "2"

In [None]:
# summary of middle messages, addingin the outer messages, much like AI realm
remaining_messages = [messages[0]] + [ai_message] + [messages[-1]]

In [None]:
remaining_messages

In [None]:
remaining_messages.append(llm.invoke(remaining_messages))

In [None]:
for m in remaining_messages:
    m.pretty_print()

In [None]:
remaining_messages[-1].response_metadata["token_usage"]

Summary: Techniques for Limiting Messages to Save Tokens  
Overview  
This demo explores multiple techniques for limiting messages in LangGraph workflows. Reducing message history is important for saving tokens and optimizing performance in agentic applications that use LLMs over multi-turn conversations.
  
Key Steps Covered  
1. Initial Setup  
A single-node workflow is created with:
A system message ("You are a FinTech specialist").
Few-shot examples of human and AI messages (e.g., "What is Pokémon?" → refusal to answer).
messages = [
  SystemMessage(...),
  HumanMessage(...),
  AIMessage(...),
  HumanMessage(...),
]
Each message is assigned a unique ID for easier filtering later.  
2. Token Usage Without Trimming  
When invoking with the full message history:
Prompt tokens and completion tokens are relatively high (e.g., ~239 total tokens).
response = llm.invoke(messages)  
3. Simple Manual Filtering  
Manually invoking the LLM with only a subset of the messages (e.g., first and last message).
This substantially reduces token usage (e.g., ~96 total tokens).
response = llm.invoke([messages[0], messages[-1]])  
4. Filtering Inside the Node  
A custom state is created by extending MessageState, including:
messages
filtered_messages
Inside the node, only the last three messages are passed to the LLM.
state = {"messages": [...], "filtered_messages": state["messages"][-3:]}
This avoids needing to manually slice messages each time.  
5. Using Remove Messages Strategy  
The remove_message reducer from LangGraph is used to delete unwanted messages based on IDs.
A deletion list is created to filter out irrelevant few-shot examples while preserving essential context.
from langgraph.reducers import remove_messages

delete_messages = ["id_of_old_message", "id_of_another_old_message"]
messages = remove_messages(existing_messages, delete_messages=delete_messages)
This helps refine conversation history efficiently.  
6. Using Trim Messages for Token Limits  
Trim strategy is introduced to limit messages by token budget:
Keeps only the latest messages that fit within a specified token limit.
The trim_messages() method is used with different max token thresholds.
trimmed = trim_messages(messages, max_tokens=250, strategy="last")
Behavior:

Higher token limits keep more conversation history.
Lower token limits progressively discard older messages.
Examples:

250 tokens → retains last two messages.
30 tokens → retains only system message.  
7. Summarization to Compress Messages  
Messages between the initial system message and the latest user query are summarized.
Summarization is prompted by inserting a special HumanMessage: "Summarize the above conversation."
This produces a concise summary that replaces multiple older turns.
summarized_message = llm.invoke(summary_prompt)
The resulting list:

SystemMessage
Summary AIMessage
Most recent HumanMessage
Token usage drops significantly after summarization.
  
8. Key Concepts Highlighted  
Manual slicing reduces input size but needs management.
Automatic filtering inside nodes allows persistent behavior.
Reducers (remove_messages, add_messages) provide fine-grained control.
Trimming by token count ensures fitting into LLM token limits dynamically.
Summarization reduces message volume while retaining conversation context.  
9. Conclusion  
Efficient management of conversation history is essential for scalable LLM applications.
LangGraph offers flexible techniques to balance memory retention with token usage constraints.
Combining filtering, trimming, and summarization enables smooth long-running agentic workflows.

L3_demo_07_multiple_schemas

In [None]:
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display

Hidden Layer

In [None]:
class ProcessState(TypedDict):
    input: str
    output: str

In [None]:
class HiddenState(TypedDict):
    thought: str

In [None]:
def node_a(state: ProcessState) -> HiddenState:
    input_value = state["input"]
    print(f"NODE A:\n "
        f"->input:{input_value}\n " 
    )
    return {"thought": f"I don't know what to do with with this message"}

def node_b(state: HiddenState) -> ProcessState:
    hidden_thought = state["thought"]
    print(f"NODE B:\n "
        f"->hidden_thought:{hidden_thought}\n " 
    )
    return {"output": "Thank you for your message! We're processing it and get back to you soon!"}

In [None]:
workflow = StateGraph(ProcessState)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)
graph = workflow.compile()
display(
    Image(
        graph.get_graph().draw_mermaid_png()
    )
)

In [None]:
graph.invoke({"input" : "The product doesn't work. I want my money back!"})

**StateGraph with Input and Output**
Three schemas

In [None]:
class InputState(TypedDict):
    input: str

class OutputState(TypedDict):
    output: str

class ProcessState(TypedDict):
    input: str
    thought: str
    output: str

In [None]:
def l1_agent(state: InputState):
    input_value = state["input"]
    print(f"NODE A:\n "
        f"->input:{input_value}\n " 
    )
    return {
        "output": "Thank you for your message!",
        "thought": "An L2 Agent should take care of this"
    }

def l2_agent(state: ProcessState) -> OutputState:
    l1_output = state["output"]
    hidden_thought = state["thought"]
    print(f"NODE B:\n "
        f"->l1_output:{l1_output}\n "
        f"->hidden_thought:{hidden_thought}\n " 
    )
    return {
        "output": f"{l1_output} We're processing it and get back to you soon!"
    }

In [None]:
# workflow = StateGraph(ProcessState)
workflow = StateGraph(ProcessState, input=InputState, output=OutputState)
workflow.add_node(l1_agent)
workflow.add_node(l2_agent)
workflow.add_edge(START, "l1_agent")
workflow.add_edge("l1_agent", "l2_agent")
workflow.add_edge("l2_agent", END)
graph = workflow.compile()
display(
    Image(
        graph.get_graph().draw_mermaid_png()
    )
)

In [None]:
graph.invoke({"input" : "The product doesn't work. I want my money back!"})