# Memory Agent

## Review

We created a chatbot that saves semantic memories to a single [user profile](https://docs.langchain.com/oss/python/concepts/memory#profile) or [collection](https://docs.langchain.com/oss/python/concepts/memory#collection).

We introduced [Trustcall](https://github.com/hinthornw/trustcall) as a way to update either schema.

## Goals

Now, we're going to pull together the pieces we've learned to build an agent with long-term memory.

Our agent, `task_mAIstro`, will help us manage a ToDo list! 

The chatbots we built previously *always* reflected on the conversation and saved memories. 

`task_mAIstro` will decide *when* to save memories (items to our ToDo list).

The chatbots we built previously always saved one type of memory, a profile or a collection. 

`task_mAIstro` can decide to save to either a user profile or a collection of ToDo items.

In addition semantic memory, `task_mAIstro` also will manage procedural memory.

This allows the user to update their preferences for creating ToDo items. 

In [1]:
import uuid
from pprint import pprint
from helper import *
from pydantic import BaseModel, Field
from trustcall import create_extractor
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, merge_message_runs
from typing import TypedDict, Literal, Optional, List
from IPython.display import Image, display
from datetime import datetime
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, END, START
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore

## Visibility into Trustcall updates

Trustcall creates and updates JSON schemas.

What if we want visibility into the *specific changes* made by Trustcall?

For example, we saw before that Trustcall has some of its own tools to:

* Self-correct from validation failures -- [see trace example here](https://smith.langchain.com/public/5cd23009-3e05-4b00-99f0-c66ee3edd06e/r/9684db76-2003-443b-9aa2-9a9dbc5498b7) 
* Update existing documents -- [see trace example here](https://smith.langchain.com/public/f45bdaf0-6963-4c19-8ec9-f4b7fe0f68ad/r/760f90e1-a5dc-48f1-8c34-79d6a3414ac3)

Visibility into these tools can be useful for the agent we're going to build.

Below, we'll show how to do this!

In [2]:
class Memory(BaseModel):
    content: str = Field(
        description="The main content of the memory. For example: User expressed interest in learning about French."
    )

class MemoryCollection(BaseModel):
    memories: List[Memory] = Field(
        description="A list of memories about the user."
    )

When storing objects (e.g., memories) in the [Store](https://reference.langchain.com/python/langgraph/store/?h=basestor#langgraph.store.base.BaseStore), we provide:

- The `namespace` for the object, a tuple (similar to directories)
- the object `key` (similar to filenames)
- the object `value` (similar to file contents)

We use the [put](https://reference.langchain.com/python/langgraph/store/?h=basestor#langgraph.store.base.BaseStore.put) method to save an object to the store by `namespace` and `key`.

In [3]:
# Inspect the tool calls made by Trustcall
class Spy:
    def __init__(self):
        self.called_tools = []

    def __call__(self, run):
        # Collect information about the tool calls made by the extractor.
        q = [run]
        while q:
            r = q.pop()
            if r.child_runs:
                q.extend(r.child_runs)
            if r.run_type == "chat_model":
                self.called_tools.append(
                    r.outputs["generations"][0][0]["message"]["kwargs"]["tool_calls"]
                )

In [4]:
spy = Spy()

In [5]:
# Initialize the model
model = ChatOpenAI(temperature=0, model_name="gpt-4o-mini")
model.invoke("Hello").content

'Hello! How can I assist you today?'

In [6]:
trust_call_extractor = create_extractor(
    model,
    tools=[Memory],
    tool_choice="Memory",
    enable_inserts=True
)

# Add the spy as a listener
trust_call_extractor_with_listner = trust_call_extractor.with_listeners(on_end=spy)

In [8]:
# Instruction
instruction = """Extract memories from the following conversation:"""

# Conversation
conversation = [
    HumanMessage(content="Hi, I'm Umer."), 
    AIMessage(content="Nice to meet you, Umer."), 
    HumanMessage(content="This morning I had a nice bike ride in San Francisco.")
]

# Invoke the extractor
result = trust_call_extractor.invoke({
    "messages": [SystemMessage(content=instruction)] + conversation
})

In [9]:
# Messages contain the tool calls
for m in result['messages']:
    m.pretty_print()

Tool Calls:
  Memory (call_cnpn4mv0MbmqDUfdg1l1Km0z)
 Call ID: call_cnpn4mv0MbmqDUfdg1l1Km0z
  Args:
    content: User had a nice bike ride in San Francisco this morning.


In [10]:
# Responses contain the memories that adhere to the schema
for m in result["responses"]: 
    print(m)

content='User had a nice bike ride in San Francisco this morning.'


In [11]:
# Metadata contains the tool call  
for m in result["response_metadata"]: 
    print(m)

{'id': 'call_cnpn4mv0MbmqDUfdg1l1Km0z'}


In [12]:
# Update the conversation
updated_conversation = [
    AIMessage(content="That's great, did you do after?"), 
    HumanMessage(content="I went to Tartine and ate a croissant."),                        
    AIMessage(content="What else is on your mind?"),
    HumanMessage(content="I was thinking about my Japan, and going back this winter!")
]

# Update the instruction
system_msg = """Update existing memories and create new ones based on the following conversation:"""

In [13]:
# We'll save existing memories, giving them an ID, key (tool name), and value
tool_name = "Memory"
existing_mem = [(str(i), tool_name, m) for i, m in enumerate(result['responses'])] if result['responses'] else []
existing_mem

[('0',
  'Memory',
  Memory(content='User had a nice bike ride in San Francisco this morning.'))]

In [14]:
# Invoke the extractor with our updated conversation and existing memories
result = trust_call_extractor_with_listner.invoke({
    "messages": updated_conversation,
    "existing": existing_mem
})

In [15]:
result.keys()

dict_keys(['messages', 'responses', 'response_metadata', 'attempts'])

In [16]:
for m in result['response_metadata']:
    print(m)

{'id': 'call_OWEe21yDLQbWkNTtD5zahECY', 'json_doc_id': '0'}
{'id': 'call_VXHdQtBLOu2jxkkPeQIgXb6f'}
{'id': 'call_TbquOFUIluO7KzSMrjqYXXe6'}


In [17]:
# Messages contain the tool calls
for m in result['messages']:
    m.pretty_print()

Tool Calls:
  Memory (call_OWEe21yDLQbWkNTtD5zahECY)
 Call ID: call_OWEe21yDLQbWkNTtD5zahECY
  Args:
    content: User went to Tartine and ate a croissant.
    -: {'content': 'User is thinking about going back to Japan this winter.'}
  Memory (call_VXHdQtBLOu2jxkkPeQIgXb6f)
 Call ID: call_VXHdQtBLOu2jxkkPeQIgXb6f
  Args:
    content: User went to Tartine and ate a croissant.
  Memory (call_TbquOFUIluO7KzSMrjqYXXe6)
 Call ID: call_TbquOFUIluO7KzSMrjqYXXe6
  Args:
    content: User is thinking about going back to Japan this winter.


In [18]:
# Parsed responses
for m in result["responses"]:
    print(m)

content='User went to Tartine and ate a croissant.'
content='User went to Tartine and ate a croissant.'
content='User is thinking about going back to Japan this winter.'


In [19]:
# Inspect the tool calls made by Trustcall
spy.called_tools

[[{'name': 'PatchDoc',
   'args': {'json_doc_id': '0',
    'planned_edits': '1. Replace the existing content with a new memory about visiting Tartine and eating a croissant. 2. Add a new memory about planning a trip to Japan this winter.',
    'patches': [{'op': 'replace',
      'path': '/content',
      'value': 'User went to Tartine and ate a croissant.'},
     {'op': 'add',
      'path': '/-',
      'value': {'content': 'User is thinking about going back to Japan this winter.'}}]},
   'id': 'call_OWEe21yDLQbWkNTtD5zahECY',
   'type': 'tool_call'},
  {'name': 'Memory',
   'args': {'content': 'User went to Tartine and ate a croissant.'},
   'id': 'call_VXHdQtBLOu2jxkkPeQIgXb6f',
   'type': 'tool_call'},
  {'name': 'Memory',
   'args': {'content': 'User is thinking about going back to Japan this winter.'},
   'id': 'call_TbquOFUIluO7KzSMrjqYXXe6',
   'type': 'tool_call'}]]

In [20]:
def extract_tool_info(tool_calls, schema_name="Memory"):
    """Extract information from tool calls for both patches and new memories.
    
    Args:
        tool_calls: List of tool calls from the model
        schema_name: Name of the schema tool (e.g., "Memory", "ToDo", "Profile")
    """

    # Initialize list of changes
    changes = []
    
    for call_group in tool_calls:
        for call in call_group:
            if call['name'] == 'PatchDoc':
                changes.append({
                    'type': 'update',
                    'doc_id': call['args']['json_doc_id'],
                    'planned_edits': call['args']['planned_edits'],
                    'value': call['args']['patches'][0]['value']
                })
            elif call['name'] == schema_name:
                changes.append({
                    'type': 'new',
                    'value': call['args']
                })

    # Format results as a single string
    result_parts = []
    for change in changes:
        if change['type'] == 'update':
            result_parts.append(
                f"Document {change['doc_id']} updated:\n"
                f"Plan: {change['planned_edits']}\n"
                f"Added content: {change['value']}"
            )
        else:
            result_parts.append(
                f"New {schema_name} created:\n"
                f"Content: {change['value']}"
            )
    
    return "\n\n".join(result_parts)

In [21]:
# Inspect spy.called_tools to see exactly what happened during the extraction
schema_name = "Memory"
changes = extract_tool_info(spy.called_tools, schema_name)
print(changes)

Document 0 updated:
Plan: 1. Replace the existing content with a new memory about visiting Tartine and eating a croissant. 2. Add a new memory about planning a trip to Japan this winter.
Added content: User went to Tartine and ate a croissant.

New Memory created:
Content: {'content': 'User went to Tartine and ate a croissant.'}

New Memory created:
Content: {'content': 'User is thinking about going back to Japan this winter.'}


## Creating an agent

There are many different agent architectures to choose from.

Here, we'll implement something simple, a [ReAct](https://docs.langchain.com/oss/python/langgraph/workflows-agents#agents) agent.

This agent will be a helpful companion for creating and managing a ToDo list.

This agent can make a decision to update three types of long-term memory: 

(a) Create or update a user `profile` with general user information 

(b) Add or update items in a ToDo list `collection`

(c) Update its own `instructions` on how to update items to the ToDo list

In [22]:
model = ChatOpenAI(model="gpt-4o", temperature=0)

In [23]:
class UpdateMemory(TypedDict):
    """ Decision on what memory type to update """
    update_type = Literal['user', 'todo', 'instructions']

In [24]:
# User profile schema
class Profile(BaseModel):
    """This is the profile of the user you are chatting with"""

    name: Optional[str] = Field(
        description="The user's name", default=None
    )
    location: Optional[str] = Field(
        description="The user's location", default=None
    )
    job: Optional[str] = Field(
        description="The user's job", default=None
    )
    connections: list[str] = Field(
        description="Personal connection of the user, such as family members, friends, or coworkers",
        default_factory=list
    )
    interests: list[str] = Field(
        description="Interests that the user has", 
        default_factory=list
    )

In [26]:
# ToDo schema
class ToDo(BaseModel):
    task: str = Field(
        description="The task to be completed."
    )
    time_to_complete: Optional[int] = Field(
        description="Estimated time to complete the task (minutes)."
    )
    deadline: Optional[datetime] = Field(
        description="When the task needs to be completed by (if applicable)",
        default=None
    )
    solutions: list[str] = Field(
        description="List of specific, actionable solutions (e.g., specific ideas, service providers, or concrete options relevant to completing the task)",
        max_length=1,
        default_factory=list
    )
    status: Literal["not started", "in progress", "done", "archived"] = Field(
        description="Current status of the task",
        default="not started"
    )

In [28]:
# Create the Trustcall extractor for updating the user profile 
profile_extractor = create_extractor(
    model,
    tools=[Profile],
    tool_choice="Profile"
)

### **Node definition**

In [30]:
def task_mAIstro(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """Load memories from the store and use them to personalize the chatbot's response."""
    # Get the user ID from the config
    user_id = config["configurable"]["user_id"]
    
    # Retrieve profile memory from the store
    namespace = ("profile", user_id)
    memories = store.search(namespace)
    if memories:
        user_profile = memories[0].value
    else:
        user_profile = None

    # Retrieve task memory from the store
    namespace = ("todo", user_id)
    memories = store.search(namespace)
    todo = "\n".join(f"{mem.value}" for mem in memories)

    # Retrieve custom instructions
    namespace = ("instructions", user_id)
    memories = store.search(namespace)
    if memories:
        instructions = memories[0].value
    else:
        instructions = ""
    
    # Respond using memory as well as the chat history
    system_msg = [SystemMessage(
        MODEL_SYSTEM_MESSAGE.format(
            user_profile=user_profile,
            todo=todo,
            instructions=instructions
        )
    )]
    
    # Respond using memory as well as the chat history
    response = model.bind_tools([UpdateMemory], parallel_tool_calls=False).invoke(
        system_msg + state["messages"]
    )

    return { "messages": [response] }