# Chatbot with Collection Schema 

## Review

We extended our chatbot to save semantic memories to a single [user profile](https://langchain-ai.github.io/langgraph/concepts/memory/#profile). 

We also introduced a library, [Trustcall](https://github.com/hinthornw/trustcall), to update this schema with new information. 

## Goals

Sometimes we want to save memories to a [collection](https://docs.google.com/presentation/d/181mvjlgsnxudQI6S3ritg9sooNyu4AcLLFH1UK0kIuk/edit#slide=id.g30eb3c8cf10_0_200) rather than single profile. 

Here we'll update our chatbot to [save memories to a collection](https://langchain-ai.github.io/langgraph/concepts/memory/#collection).

We'll also show how to use [Trustcall](https://github.com/hinthornw/trustcall) to update this collection. 


In [1]:

%%capture --no-stderr
%pip install -U trustcall langchain_google_genai langgraph langchain_core

<class 'NoneType'>


In [2]:
from dotenv import load_dotenv
import os

load_dotenv()

GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
    raise ValueError("GEMINI_API_KEY is not set in the .env file.")

print("API key loaded from .env file.")

API key loaded from .env file.


## Defining a collection schema

Instead of storing user information in a fixed profile structure, we'll create a flexible collection schema to store memories about user interactions.

Each memory will be stored as a separate entry with a single `content` field for the main information we want to remember

This approach allows us to build an open-ended collection of memories that can grow and change as we learn more about the user.

We can define a collection schema as a [Pydantic](https://docs.pydantic.dev/latest/) object. 

In [3]:
from typing import TypedDict, List
from pydantic import BaseModel,Field
class Memory(BaseModel):
    content:str = Field(description='The main content of the memory. For example:  likeness,  learning interests ...')
class MemoryCollection(BaseModel): 
    final_memory: List[Memory] = Field(description='A list of memories about the user.')

We can used LangChain's chat model [chat model](https://python.langchain.com/docs/concepts/chat_models/) interface's [`with_structured_output`](https://python.langchain.com/docs/concepts/structured_outputs/#recommended-usage) method to enforce structured output.

In [5]:
from langchain_core.messages import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI
model = ChatGoogleGenerativeAI(model='gemini-1.5-flash',api_key=GEMINI_API_KEY)
model_with_structure = model.with_structured_output(Memory)
structured_output = model_with_structure.invoke([HumanMessage("My name is arman , I want to learn AI and i like programming .I am from pakistan. I am 23 year's old")])
structured_output.content

"My name is arman , I want to learn AI and i like programming .I am from pakistan. I am 23 year's old"

In [None]:
from langchain_core.messages import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from typing import TypedDict, List
from pydantic import BaseModel,Field

class Memory(BaseModel):
    content:str = Field(description='The main content of the memory. For example:  likeness,  learning interests ...')
class MemoryCollection(BaseModel): 
    final_memory: List[Memory] = Field(description='A list of memories about the user.')

model = ChatGoogleGenerativeAI(model='gemini-1.5-flash',api_key=GEMINI_API_KEY)
model_with_structure = model.with_structured_output(MemoryCollection)
structured_output = model_with_structure.invoke([HumanMessage("My name is arman , I want to learn AI and i like programming . I am from pakistan. I am 23 year's old")])
structured_output.final_memory[0].content

We can use `model_dump()` to serialize a Pydantic model instance into a Python dictionary.

In [None]:
structured_output.content[0].model_dump()

## Updating collection schema

We discussed the challenges with updating a profile schema in the last lesson. 

The same applies for collections! 

We want the ability to update the collection with new memories as well as update existing memories in the collection. 

Now we'll show that [Trustcall](https://github.com/hinthornw/trustcall) can be also used to update a collection. 

This enables both addition of new memories as well as [updating existing memories in the collection](https://github.com/hinthornw/trustcall?tab=readme-ov-file#simultanous-updates--insertions
).

Let's define a new extractor with Trustcall. 

As before, we provide the schema for each memory, `Memory`.  

But, we can supply `enable_inserts=True` to allow the extractor to insert new memories to the collection. 

In [None]:
from trustcall import create_extractor
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from typing import TypedDict, List
from pydantic import BaseModel,Field

class Memory(BaseModel):
    content:str = Field(description='The main content of the memory. For example:  likeness,  learning interests ...')

trust_call = create_extractor(model,tools=[Memory],tool_choice='Memory')

instruction = """Extract memories from the following conversation:"""

  
chatting = [HumanMessage(content="Hi, I'm arman."), 
                AIMessage(content="Nice to meet you, arman."), 
                HumanMessage(content=", I want to learn AI and i like programming . I am from pakistan. I am 23 year's old")]
final_msg = {"messages": [SystemMessage(content=instruction)] + chatting}

resp = trust_call.invoke(final_msg)
 
 


In [None]:
existing_memories=[(str(i),"Memory",memory.model_dump()) for i, memory in enumerate(resp["responses"])] if resp["responses"] else None
print(existing_memories)

In [None]:
new_conversation = [AIMessage(content="Today programming is a broad field but AI can replace the jobs of small level project"),
                        HumanMessage(content="I want to go in this summar holidays in islambad"),]
 
sys_msg = """Update existing memories and create new ones based on the following conversation:"""
final_msg = {"messages": [SystemMessage(content=sys_msg)] + new_conversation,'existing':existing_memories}
new_resp = trust_call.invoke(final_msg)
print(new_resp)

In [None]:
for m in new_resp["messages"]:
    m.pretty_print()

## Chatbot with collection schema updating

Now, let's bring Trustcall into our chatbot to create and update a memory collection.

In [None]:
from IPython.display import Image, display
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.base import BaseStore
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables.config import RunnableConfig
from pydantic import BaseModel, Field 
from typing import List
import uuid
from langchain_core.messages import merge_message_runs

class Memory(BaseModel):
    content: str = Field(description="The main content of the memory. For example: User expressed interest in learning about French.")

MODEL_SYSTEM_MESSAGE = """You are a helpful chatbot. You are designed to be a companion to a user. 

You have a long term memory which keeps track of information you learn about the user over time.

Current Memory (may include updated memories from this conversation): 

{memory}"""

trustcall_extractor = create_extractor(
    model,
    tools=[Memory],
    tool_choice="Memory", 
)
TRUSTCALL_INSTRUCTION = """Reflect on following interaction. 

Use the provided tools to retain any necessary memories about the user. 

Use parallel tool calling to handle updates and insertions simultaneously:"""

def call_model(state:MessagesState,config:RunnableConfig,store:BaseStore):
  user_id = config["configurable"]["user_id"]
  namespace = ("memory", user_id) 

  existing_memory = store.search(namespace) 
  formatted_memory = "\n".join(f"- {mem.value['content']}" for mem in existing_memory)
 
  sys_msg = MODEL_SYSTEM_MESSAGE.format(memory=formatted_memory)
 
  messages = [SystemMessage(content=sys_msg)] + state["messages"]

  resp = model.invoke(messages)
  return {"messages":resp}



CREATE_MEMORY_INSTRUCTION = """"You are collecting information about
the user to personalize your responses.

CURRENT USER INFORMATION:
{memory}

INSTRUCTIONS:
1. Review the chat history below carefully
2. Identify new information about the user, such as:
   - Personal details (name, location)
   - Preferences (likes, dislikes)
   - Interests and hobbies
   - Past experiences
   - Goals or future plans
3. Merge any new information with existing memory
4. Format the memory as a clear, bulleted list
5. If new information conflicts with existing memory, keep the most recent version

Remember: Only include factual information directly stated by the user. Do not make assumptions or inferences.

Based on the chat history below, please update the user information:"""
def call_summary(state:MessagesState,config:RunnableConfig,store:BaseStore):
  user_id = config["configurable"]["user_id"]
  namespace = ("memory", user_id) 
  existing_items = store.search(namespace) 
  print(existing_items)
  print(f'typeof {type(existing_items)}')

   
  existing_memories = [(existing_item.key, 'Memory', existing_item.value) for existing_item in existing_items] if existing_items else None

  updated_messages=list(merge_message_runs(messages=[SystemMessage(content=TRUSTCALL_INSTRUCTION)] + state["messages"]))
 
  result = trustcall_extractor.invoke({"messages": updated_messages, 
                                      "existing": existing_memories}) 
  for r, rmeta in zip(result["responses"], result["response_metadata"]):
      store.put(namespace,rmeta.get("json_doc_id", str(uuid.uuid4())), r.model_dump(mode="json"), )

  



graph = StateGraph(MessagesState)
graph.add_node('call_model',call_model)
graph.add_node('call_summary',call_summary)

graph.add_edge(START,'call_model') 
graph.add_edge('call_model','call_summary')
graph.add_edge('call_summary',END)

short_term_memory = MemorySaver()
long_term_memory = InMemoryStore()
compiled_graph = graph.compile(checkpointer=short_term_memory,store=long_term_memory) 

display(Image(compiled_graph.get_graph().draw_mermaid_png()))



In [None]:
config = {"configurable": {"thread_id": "1", "user_id": "1"}} 
input_messages = [HumanMessage(content="Hi, my name is Arman, and I am from Sialkot. I am 23 years old.")] 
for chunk in compiled_graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

In [None]:
config = {"configurable": {"thread_id": "1", "user_id": "1"}} 
input_messages = [HumanMessage(content="I like programming")] 
for chunk in compiled_graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()


In [None]:
user_id = "1"
namespace = ("memory", user_id) 
memories = long_term_memory.search(namespace)
for m in memories:
    print(m.dict())

In [None]:
input_messages = [HumanMessage(content="I want to buy shoes and a dress.")] 
for chunk in compiled_graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

In [None]:
input_messages = [HumanMessage(content="I'm looking for black sneakers with a budget of $300 and a casual dress in white, with a budget of $400 and a waist size of 36 ")] 
for chunk in compiled_graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

In [None]:
config = {"configurable": {"thread_id": "1", "user_id": "1"}} 
input_messages = [HumanMessage(content="what do you know about me")] 
for chunk in compiled_graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

Continue the conversation in a new thread.

In [None]:
config = {"configurable": {"thread_id": "2", "user_id": "1"}} 
input_messages = [HumanMessage(content="what do you know about me")] 
for chunk in compiled_graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

In [None]:
from langchain_core.messages import (
    merge_message_runs,
    AIMessage,
    HumanMessage,
    SystemMessage,
    ToolCall,
)

messages = [
    SystemMessage("you're a good assistant."),
    HumanMessage("what's your favorite color", id="foo",),
    HumanMessage("wait your favorite food", id="bar",),
    AIMessage(
        "my favorite colo",
        tool_calls=[ToolCall(name="blah_tool", args={"x": 2}, id="123", type="tool_call")],
        id="baz",
    ),
    AIMessage(
        [{"type": "text", "text": "my favorite dish is lasagna"}],
        tool_calls=[ToolCall(name="blah_tool", args={"x": -10}, id="456", type="tool_call")],
        id="blur",
    ),
]

merge_message_runs(messages)

### LangSmith 

https://smith.langchain.com/public/c87543ec-b426-4a82-a3ab-94d01c01d9f4/r

## Studio

![Screenshot 2024-10-30 at 11.29.25 AM.png](https://cdn.prod.website-files.com/65b8cd72835ceeacd4449a53/6732d0876d3daa19fef993ba_Screenshot%202024-11-11%20at%207.50.21%E2%80%AFPM.png)