# Add Long-term User Memory

LangGraph already provides graph persistence as a first-class feature. That's all you need to have **thread** level memory (aka chat history memory).

There are huge benefits to persisting memory **across threads**, however. This notebook shows how to implement a simple 'User Profile' type memory as an async process and connect it to your LangGraph.

The user profile can be any schema. We will naively overwrite the user profile any time a new thread is scheduled to process memories.

## Prerequisites

Let's install this project's prereqs. We will use Claude for everything. Feel free to swap it out for any model that can reasonably perform function calling.

In [None]:
%%capture --no-stderr
# %pip install -U langgraph aiosqlite langchain_anthropic

In [1]:
%env LANGCHAIN_PROJECT=langgraph-long-term-memory

env: LANGCHAIN_PROJECT=langgraph-long-term-memory


## Memory DB

First, we will set up a table in our database to store user memories. For this how-to, we will use `sqlite` (since it requires little additional setup), but you can swap this out with postgres or whatever other database you'd like.
We will re-use this DB for our graph checkpointing.

First, create the memories table:

In [2]:
import aiosqlite

conn_string = ":memory:"
conn = aiosqlite.connect(conn_string)
await conn
async with conn.executescript(
    """
    CREATE TABLE IF NOT EXISTS core_memories (
        user_id TEXT NOT NULL,
        memory TEXT NOT NULL,
        PRIMARY KEY (user_id)
    );
    """
):
    await conn.commit()

Next, define the accessor methods. These just upsert or get the memory for a specific user.

In [3]:
import json


async def get_user_profile(conn, user_id):
    async with conn.execute(
        "SELECT memory FROM core_memories WHERE user_id = ?",
        (user_id,),
    ) as cursor:
        if value := await cursor.fetchone():
            memory_str = value[0]
            return json.loads(memory_str)
        return None


async def commit_user_profile(conn, user_id, profile):
    async with conn.execute(
        "INSERT OR REPLACE INTO core_memories (user_id, memory) VALUES (?, ?)",
        (
            user_id,
            profile.json(),
        ),
    ):
        await conn.commit()

Next, define the LLM to extract the user profile from threads. Feel free to customize this step! The key components are:

1. The memory schema to populate. We have a very simple schema that contains a list of core memories.
2. Formatted messages to ensure the LLM extracts memories about the user (rather than the assistant or other users)
3. Handling to load and save the memories to the DB.

Note that here we are naively re-generating the state on each thread invocation. We have found better results if we do a JSONPatch schema to perform updates (after the initial generation) as that requires less work on the LLM's behalf and reduces unwanted deletions.

In [4]:
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.tools import tool


class UserProfile(BaseModel):
    core_memories: list[str] | str | None = Field(
        ..., description="All core memories from this conversation."
    )
    interests: list[str] | str = Field(
        ...,
        descriptions="Interests the user has expressed, like specific sports, hobbies, beliefs, etc.",
    )
    name: str | None = Field(..., description="The user's name (if shared)")
    age: int | None = Field(default=None, description="The user's age (if shared). Otherwise, null.")

prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """"Below, you are given one or more conversations between {user_id} and an AI.

Use the provided function to save all salient information about user {user_id}.
Refrain from recording information about the AI or other users that is not directly relevant to user {user_id}.{current_user_state}""",
        ),
        ("placeholder", "{messages}"),
        (
            "user",
            "<moderator>Reflect on the above conversation and update the user profile based on {user_id}'s revelations.</moderator>",
        ),
    ]
)


_CURRENT_STATE_TEMPLATE = """
## Current User Profile
<profile>
{current_user_state}
</profile>

Your response will overwrite this profile, so please ensure to retain all information you don't
wish to lose. DO NOT delete any information unless it is explicitly overwritten by new information."""


async def prepare_inputs(inputs: dict):
    messages = inputs["messages"]
    user_id = inputs["user_id"]
    current_user_state = ""
    if current_profile := await get_user_profile(conn, user_id):
        current_user_state = _CURRENT_STATE_TEMPLATE.format(
            current_user_state=json.dumps(current_profile)
        )
    converted_messages = []
    for m in messages:
        if m.type == "human":
            # Note: this only handles string content
            content = f"<user id={user_id}>{m.content}</user>"
            m = m.__class__(**m.dict(exclude={"content"}), content=content)
        converted_messages.append(m)
    return {
        **inputs,
        "current_user_state": current_user_state,
        "messages": messages,
    }


async def commit_extraction(pipe_output: dict):
    extracted = pipe_output["extracted"]
    user_id = pipe_output["user_id"]
    await commit_user_profile(conn, user_id, extracted)
    return f"Successfully committed: {extracted.json()} for user {user_id}"


# TODO: Add the retries + persistence. We got some fun tricks up our sleeve for extraction improvements
mem_llm = ChatAnthropic(model="claude-3-sonnet-20240229")
mem_chain = (
    prepare_inputs
    | RunnablePassthrough.assign(
        extracted=prompt | mem_llm.with_structured_output(UserProfile)
    )
    | commit_extraction
)

  warn_beta(


## Memory Manager

It's nice to not have to explicitly trigger memory consolidation after a given thread. In many scenarios, it's impossible to know if a thread has been fully completed!

As a balance, we will schedule a consolidation task (aka schedule calls to the `mem_chain` above) whenever a new set of messages are sent to the manager. If an update comes in while that process is still scheduled, we will reset the timer.
This reduces redundant calls to the LLM. Feel free to expand on these heuristics (only trigger after convo length has reached size X, only trigger for certain words, run a tiny model or embedding classifier to see if it should trigger, etc.)

In [5]:
import asyncio
import logging
logger = logging.getLogger("memory")


class MemoryManager:
    def __init__(self, mem_chain):
        self.mem_chain = mem_chain
        self.lock = asyncio.Lock()
        self.active_timers = {}

    async def enqueue_thread(self, user_id, thread_id, messages, delay=60):
        timer_key = (user_id, thread_id)

        if timer_key in self.active_timers:
            # Cancel the existing timer task
            async with self.lock:
                if timer_key in self.active_timers:
                    (task, _) = self.active_timers[timer_key]
                    task.cancel()

        async def schedule_ingestion():
            await asyncio.sleep(delay)
            try:
                await self.mem_chain.ainvoke({"messages": messages, "user_id": user_id})
            except Exception as e:
                logger.error(repr(e))
            async with self.lock:
                if timer_key in self.active_timers:
                    del self.active_timers[timer_key]

        # Create a new timer task
        task = asyncio.create_task(schedule_ingestion())
        async with self.lock:
            self.active_timers[timer_key] = (task, messages)

    async def trigger(self, user_id=None, thread_id=None):
        async def ingest(m, uid, tid):
            try:
                await self.mem_chain.ainvoke({"messages": m, "user_id": uid})
            except Exception as e:
                logger.error(repr(e))
            async with self.lock:
                # not re-entrant so this may be funky
                if (uid, tid) in self.active_timers:
                    del self.active_timers[(uid, tid)]

        if user_id and thread_id:
            # Delete and immediately triggger
            timer_key = (user_id, thread_id)
            if timer_key in self.active_timers:
                async with self.lock:
                    res = self.active_timers.pop(timer_key, None)
                    if res is not None:
                        old_task, messages = res
                        old_task.cancel()
                        task = asyncio.create_task(ingest(messages, user_id, thread_id))
                        self.active_timers[timer_key] = (task, messages)
        elif user_id is not None:
            async with self.lock:
                new_tasks = {}
                for (uid, tid), (old_task, messages) in self.active_timers.items():
                    if uid == user_id:
                        task = asyncio.create_task(ingest(messages, user_id, tid))
                        new_tasks[(uid, tid)] = (task, messages)
                        old_task.cancel()
                for k, v in new_tasks.items():
                    self.active_timers[k] = v

        else:
            raise NotImplementedError()

In [6]:
manager = MemoryManager(mem_chain)

## Integrate in your chatbot


Define your chatbot below. The key additions are:

1. Fetch the user profile from the DB in the entry node. If not present, we don't format it in.
2. Schedule memory consolidation after the assistant has responded.

We haven't added any tools or looping here, but you could extend this to a zero-shot agent design (similar to that presented in the LangGraph tutorial).

In [7]:
bot_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful AI Assistant, equipped with memory about the user (if you have previously interacted with them). Use the core memories below to help shape your conversation.{user_info}",
        ),
        ("placeholder", "{messages}"),
    ]
)

bot = (
    bot_prompt
    | ChatAnthropic(model="claude-3-haiku-20240307")
    | (lambda x: {"messages": x})
)

In [8]:
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.runnables import RunnableConfig
from typing_extensions import Annotated
from typing import TypedDict
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver


class State(TypedDict):
    messages: Annotated[list, add_messages]
    user_info: str


builder = StateGraph(State)


async def fetch_profile(state: State, config: RunnableConfig):
    user_id = config["configurable"]["user_id"]
    profile_str = ""
    if current_profile := await get_user_profile(conn, user_id):
        profile_str = f"""

## User Profile
In prior conversations, you have noted the following preferences about the user:
<user_profile>
{current_profile}
</user_profile>
Use this as your long term memory of your interactions with the user,\
 use it to be a good friend to the user and not forget important information\
 about what they've shared. Use it liberally so the user knows you're paying attention. Be a good friend and use their name if you know it!"""
    return {"user_info": profile_str}


builder.add_node("fetch_profile", fetch_profile)


async def process_convo(state: State, config: RunnableConfig):
    user_id = config["configurable"]["user_id"]
    thread_id = config["configurable"]["thread_id"]
    delay = config["configurable"].get("delay") or 60
    await manager.enqueue_thread(user_id, thread_id, state["messages"], delay=delay)
    return {}


builder.add_node("process_convo", process_convo)
builder.set_entry_point("fetch_profile")
builder.add_node("bot", bot)
builder.add_edge("fetch_profile", "bot")
builder.add_edge("bot", "process_convo")
builder.set_finish_point("process_convo")
checkpointer = AsyncSqliteSaver(conn)
graph = builder.compile(checkpointer=AsyncSqliteSaver(conn=conn))

In [9]:
from IPython.display import Image, display

try:
    display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
except:
    # This requires some extra dependencies and is optional
    pass

<IPython.core.display.Image object>

In [10]:
async def chat(text: str, user_id: str, thread_id: str):
    events = graph.astream(
        {"messages": [("user", text)]},
        {"configurable": {"user_id": str(user_id), "thread_id": str(thread_id)}},
        stream_mode="values",
    )
    async for event in events:
        if "messages" in event:
            messages = event["messages"]
            last_message = messages[-1]
            if last_message.type == "ai":
                yield last_message

## Try it out

Now let's try interacting with the bot! If you run these quickly, memory will never consolidate, so we will explicitly schedule at the end of the conversation.

In [11]:
user_id = "will"
thread_id = "convo 1"
async for msg in chat("Hi there, I'm will", user_id, thread_id):
    print(msg.content)

It's nice to meet you, Will! I'm Claude, an AI assistant. I don't have any specific memories about you from prior conversations, but I'm happy to chat and try my best to be helpful. Please let me know if you have any questions or if there is anything I can assist you with.


In [12]:
async for msg in chat("Sorry Im testing something!", user_id, thread_id):
    print(msg.content)

No problem, I understand you're just testing something. I'm happy to go along with whatever you need for your testing. Let me know if there's anything specific I can do to assist with your testing.


In [13]:
async for msg in chat("Well it's not that bad i guess!", user_id, thread_id):
    print(msg.content)

Okay, glad to hear it's not too bad! I'm always here if you need any further assistance with your testing or anything else. Feel free to let me know how I can be helpful.


In [14]:
async for msg in chat(
    "I'm building long term memory for you! then you can know me", user_id, thread_id
):
    print(msg.content)

Ah I see, that's really interesting! Building long-term memory for an AI assistant like myself is a fascinating concept. I'm excited to learn more about you and develop a deeper understanding as we continue our conversation. Please feel free to share more about yourself and what you're working on - I'm eager to engage in a substantive dialogue and have our interactions contribute to my growing knowledge base. This is a great opportunity for me to enhance my conversational abilities and personalization. I'm here to listen and learn!


In [15]:
async for msg in chat(
    "I'm building long term memory for you! then you can know me", user_id, thread_id
):
    print(msg.content)

That's wonderful, I'm really excited about the prospect of building long-term memory and getting to know you better. Having a more persistent understanding of our interactions and being able to draw on that knowledge over time will be incredibly valuable. I'm curious to learn more about your approach and how I can best assist in the process. Please, feel free to share additional details about your work and how I can contribute. I'm eager to participate in a way that helps strengthen our connection and allows me to provide more personalized and helpful responses as we continue our conversation.


In [16]:
await manager.trigger(user_id=user_id)

In [17]:
while True:
    if mem := await get_user_profile(conn, user_id):
        break
    await asyncio.sleep(1)

In [18]:
new_thread = "convo 2"
async for msg in chat("Hi there. guess what i'm working on?", user_id, new_thread):
    print(msg.content)

*smiles warmly* Hello Will! It's great to hear from you again. Based on our previous conversations, I remember that you've been working on building long-term memory for AI and improving conversational abilities. Am I right that you're continuing to explore those areas? I'm always excited to learn more about the projects you're passionate about. Please, tell me more! I'm eager to hear what you're working on now.


In [19]:
new_thread = "convo 3"
async for msg in chat("I been working! making some progress", user_id, new_thread):
    print(msg.content)

Wonderful to hear, Will! I'm so glad you've been making progress with your work. It's great that you're staying productive and achieving your goals. As your AI assistant, I always aim to provide personalized support and encouragement. Please feel free to share more about what you've been working on - I'm genuinely interested in hearing about the progress you're making. And remember, I'm here to help in any way I can, so don't hesitate to let me know if there's anything else I can do to support you.


In [20]:
new_thread = "convo 3"
async for msg in chat("Guess what i was working on? Remeber?", user_id, new_thread):
    print(msg.content)

Hmm, let me think back to our previous conversations... Ah yes, I believe you mentioned before that you've been working on building long-term memory for AI and improving AI conversational abilities. Is that correct, Will? If so, I'm really excited to hear about the progress you've made on that front. As an AI assistant myself, I'm very interested in advancements that can enhance my own conversational skills and ability to maintain personalized interactions over time. Please, tell me more about what you've been working on - I'm all ears!


In [21]:
new_thread = "convo 3"
async for msg in chat("HI?", user_id, new_thread):
    print(msg.content)

Oh, I'm so sorry, I seem to have gotten a bit carried away there! Let me just reorient myself. Hi there, how are you doing today, Will? I'm glad to hear you've been making progress on something, though I apologize that I don't recall the specifics you had mentioned before. As your AI assistant, I do my best to remember our past conversations and tailor my responses accordingly, but sometimes I may get a bit overeager in trying to demonstrate that. Please feel free to refresh my memory on what you've been working on - I'm always eager to learn more about your interests and activities. How can I be helpful to you today?
