In [1]:
from typing import Annotated, Literal, TypedDict, Sequence
from pydantic import BaseModel, Field
from dotenv import load_dotenv
import sys
import os

load_dotenv()

from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
# from langgraph.store.memory import InMemoryStore

sys.path.append(os.path.abspath(".."))
from services.long_memory_store import MemoryStore

import uuid
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

from langgraph.checkpoint.memory import MemorySaver

https://nbviewer.org/github/Coding-Crashkurse/Long-Term-Memory-Agent/blob/main/longtermmemory.ipynb

In [2]:
class ClassifyInformation(BaseModel):
    personal_info: Literal["yes", "no"] = Field(
        description="Indicates whether the information contains any personal user choices or preferences that could help an LLM provide more personalized responses over time for long-term memory use."
    )

In [3]:
# llm = ChatOpenAI(model="gpt-4o-mini", max_completion_tokens=50)

In [None]:
store = MemoryStore()
USER_ID = "2"
key = "semantic_memory"

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]
    personal_info_detected: Literal["yes", "no"] = Field(
        description="Indicates whether the information contains any personal user choices or preferences that could help an LLM provide more personalized responses over time for long-term memory use."
    )
    personal_info_extracted: str
    new_info: str
    collected_memories: str

## Identifying the inforamtion is  relevancy to stored in long-term memory

In [5]:
def personal_info_classifier(state: AgentState) -> AgentState:
    """
    Classifies if the last user message contains personal info.
    Uses few-shot examples to clarify what "personal info" is.
    """
    message = state["messages"][-1].content

    system_prompt = """You are a classifier that checks if a message contains personal info.

        Personal info includes:
        - Names (e.g., "John Smith")
        - Locations (e.g., "Berlin", "123 Main St")
        - Preferences or hobbies (e.g., "I love to code", "I prefer short replies")
        - Occupation, phone numbers, or unique IDs

        Examples:
        User: "My name is Thomas, I live in Vancouver."
        Classifier: "Yes"

        User: "I love pizza with extra cheese."
        Classifier: "Yes"

        User: "Hello, how are you?"
        Classifier: "No"

        User: "This is great weather."
        Classifier: "No"
    """

    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system_prompt),
            ("human", "{message}"),
        ]
    )
    llm = ChatOpenAI(model="gpt-4o-mini", max_completion_tokens=50)
    structured_llm = llm.with_structured_output(ClassifyInformation)
    chain = prompt | structured_llm

    result = chain.invoke({"message": message})
    state["personal_info_detected"] = result.personal_info
    return state

## If the answer of above node is true, we have to extract the informatiom

In [6]:
def personal_info_router(state: AgentState) -> Literal["extract_personal_info", "retrieve_memories"]:
    """
    If classified 'Yes', go to extraction. Otherwise skip directly to retrieving memories.
    """
    if state["personal_info_detected"].lower() == "yes":
        return "extract_personal_info"
    return "retrieve_memories"

In [7]:
def personal_info_extractor(state: AgentState) -> AgentState:
    """
    Extracts personal info from the user's message using few-shot examples.
    """
    message = state["messages"][-1].content

    # A few-shot style system prompt for extraction:
    extractor_system = """
    You are an intelligent extractor that identifies and summarizes personal user information for long-term memory storage. Your goal is to help the LLM provide more personalized responses in the future.

    Instructions:
    - Read the user's input carefully.
    - Extract any relevant personal details, such as:
    - Name, location, profession, and age
    - Interests, hobbies, goals, future plans
    - Food preferences (likes/dislikes)
    - Emotional states, values, or beliefs
    - Mention of past experiences (e.g., education, travel)
    - Any unique or identifying details

    Output Format:
    - Return a concise, comma-separated list of attributes in the format: 
    "Attribute: Value"
    - If multiple values exist, separate them with commas.
    - If no personal info is found, return: "No personal info found."

    Examples:

    User: "I am John and I live in Seattle."
    Output: "Name: John, Location: Seattle"

    User: "Hey, I'm Lucy. I love eating bananas!"
    Output: "Name: Lucy, Food Preference: bananas"

    User: "I am planning to apply to grad school in Germany next year."
    Output: "Goal: apply to grad school, Location: Germany, Timeline: next year"

    User: "Just a random statement about the weather."
    Output: "No personal info found."

    Now extract personal information from the following user input:
    """
    extractor_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", extractor_system),
            ("human", "Input: {message}"),
        ]
    )
    llm = ChatOpenAI(model="gpt-4o-mini", max_completion_tokens=50)
    chain = extractor_prompt | llm
    extracted_info = chain.invoke({"message": message})
    print("===========")
    print(f"Personal Info: {extracted_info.content.strip()}")
    print("===========")

    state["personal_info_extracted"] = extracted_info.content.strip()
    return state

## After extracting the information, Let's check for duplimacy

In [8]:
class InfoNoveltyGrade(BaseModel):
    score: Literal["yes", "no"] = Field()

def personal_info_duplicate_classifier(state: AgentState) -> AgentState:
    """
    Checks if the newly extracted info is already in the store or not.
    If 'Yes', it's new info. If 'No', it's a duplicate.
    """
    new_info = state.get("personal_info_extracted", "")
    namespace = ("user", USER_ID)
    key = "semantic_memory"
    results = store.get(namespace, key)
    old_info_list = [doc for doc in results]

    system_msg = """You are a classifier that checks if the new personal info is already stored.
If the new info adds anything new, respond 'Yes'. Otherwise 'No'."""

    old_info_str = "\n".join(old_info_list) if old_info_list else "No stored info so far."
    human_template = """New info:\n{new_info}\n
Existing memory:\n{old_info}\n
Answer ONLY 'Yes' if the new info is unique. Otherwise 'No'."""
    human_msg = human_template.format(new_info=new_info, old_info=old_info_str)

    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system_msg),
            ("human", "{human_msg}"),
        ]
    )
    llm = ChatOpenAI(model="gpt-4o-mini", max_completion_tokens=50).with_structured_output(InfoNoveltyGrade)
    chain = prompt | llm
    result = chain.invoke({"human_msg": human_msg})
    state["new_info"] = result.score.strip()
    return state

## If no dublicate found, add the memory to persistent storage

In [9]:
def personal_info_deduper_router(state: AgentState) -> Literal["personal_info_storer", "retrieve_memories"]:
    """
    If 'Yes', store the new info. Otherwise skip storing.
    """
    if state["new_info"].lower() == "yes":
        return "personal_info_storer"
    return "retrieve_memories"

## Retreiving the information about the user to call_model

In [10]:
def personal_info_storer(state: AgentState) -> AgentState:
    """
    Stores the new personal info in memory if it exists.
    """
    extracted = state.get("personal_info_extracted")
    
    if extracted:
        namespace = ("user", USER_ID)
        store.put(namespace, key, {"text": str(extracted)})
    return state

In [11]:
def retrieve_memories(state: AgentState) -> AgentState:
    """
    Retrieves all personal info from the store and aggregates into 'collected_memories'.
    """
    results = store.get(("user", USER_ID), key)
    memory_strs = [doc for doc in results]
    state["collected_memories"] = "\n".join(memory_strs)
    return state

## Logging the personal information

In [12]:
def log_personal_memory(state: AgentState) -> AgentState:
    """
    Logs the memory to stdout for debugging (optional).
    """
    print("----- Logging Personal Memory -----")
    if state["collected_memories"]:
        for i, line in enumerate(state["collected_memories"].split("\n"), start=1):
            print(f"[Memory {i}] {line}")
    else:
        print("[Memory] No personal info stored yet.")
    return state

## Call Model

In [13]:
def call_model(state: AgentState) -> AgentState:
    """
    Final LLM call that uses the collected memories in a SystemMessage.
    """
    personal_info = state.get("collected_memories", "")
    system_msg = SystemMessage(
        content=f"You are a helpful assistant. The user has shared these personal details:\n{personal_info}"
    )
    all_messages = [system_msg] + list(state["messages"])
    llm = ChatOpenAI(model="gpt-4o-mini", max_completion_tokens=50)
    response = llm.invoke(all_messages)
    state["messages"] = state["messages"] + [response]
    return state

In [14]:
workflow = StateGraph(AgentState)
workflow.add_node("personal_info_classifier", personal_info_classifier)
workflow.add_node("personal_info_extractor", personal_info_extractor)
workflow.add_node("personal_info_duplicate_classifier", personal_info_duplicate_classifier)
workflow.add_node("personal_info_storer", personal_info_storer)
workflow.add_node("retrieve_memories", retrieve_memories)
workflow.add_node("log_personal_memory", log_personal_memory)
workflow.add_node("call_model", call_model)

workflow.add_edge(START, "personal_info_classifier")
workflow.add_conditional_edges(
    "personal_info_classifier",
    personal_info_router,
    {
        "extract_personal_info": "personal_info_extractor",
        "retrieve_memories": "retrieve_memories",
    },
)

workflow.add_edge("personal_info_extractor", "personal_info_duplicate_classifier")

workflow.add_conditional_edges(
    "personal_info_duplicate_classifier",
    personal_info_deduper_router,
    {
        "personal_info_storer": "personal_info_storer",
        "retrieve_memories": "retrieve_memories",
    },
)

workflow.add_edge("personal_info_storer", "retrieve_memories")
workflow.add_edge("retrieve_memories", "log_personal_memory")
workflow.add_edge("log_personal_memory", "call_model")
workflow.add_edge("call_model", END)

workflow.set_entry_point("personal_info_classifier")

checkpointer = MemorySaver()
# graph = workflow.compile(checkpointer=checkpointer, store=store)
graph = workflow.compile(checkpointer=checkpointer)

In [15]:
# graph

In [19]:
input_data_1 = {"messages": [HumanMessage(content="hi, I want to visit new zealand")]}
graph.invoke(input=input_data_1, config={"configurable": {"thread_id": 1}})

Personal Info: "Goal: visit New Zealand"
----- Logging Personal Memory -----
[Memory 1] "Food Preference: bananas"
[Memory 2] "Goal: visit New Zealand"


{'messages': [HumanMessage(content='hi, I like to eat bananas', additional_kwargs={}, response_metadata={}, id='10c8a914-212a-451c-aba2-359088e8165d'),
  AIMessage(content="That's great! Bananas are not only delicious but also packed with nutrients. They're a good source of potassium, vitamin C, and dietary fiber. Do you have a favorite way to enjoy bananas?", additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 39, 'prompt_tokens': 38, 'total_tokens': 77, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_34a54ae93c', 'id': 'chatcmpl-BzUYzMc3UO17F6m907TrQIa6ELKKS', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}, id='run--79a933a3-465c-482b-994b-0978cafd7673-0', usage_metadata={'input_tokens': 38, 'output_t

In [20]:
existing_memory = store.get(("user", USER_ID), key)

In [21]:
existing_memory

['"Food Preference: bananas"', '"Goal: visit New Zealand"']