In [4]:
from dotenv import load_dotenv

load_dotenv()

### Long Term Memory

In [5]:
from langgraph.store.memory import InMemoryStore

store = InMemoryStore()
user_id = "my-user"
application_context = "chitchat"
namespace = (user_id, application_context)
store.put(namespace, "a-memory", {"rules": ["User likes short, direct language", "User only speaks English & python"], "my-key": "my-value"})
store.put(namespace, "another-memory", {"rules": ["User prefers concise answers"], "my-key": "my-value"})


In [6]:
store.get(namespace, "a-memory").value

In [7]:
results = store.search(namespace, filter={"my-key": "my-value"})

In [8]:
for item in results:
    print(item.value)

In [9]:
import uuid
from typing import Literal
from langgraph.store.memory import InMemoryStore
from langgraph.store.base import BaseStore
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode

@tool
def get_weather(location: str):
    """Get the weather at a specific location"""
    if location.lower() in ["munich"]:
        return "It's 15 degrees Celsius and cloudy."
    else:
        return "It's 32 degrees Celsius and sunny."

tools = [get_weather]
model = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)
store = InMemoryStore()

def call_model(state: MessagesState, config: dict, *, store: BaseStore):
    user_id = config.get("configurable", {}).get("user_id", "default-user")
    namespace = ("memories", user_id)
    memories = store.search(namespace)
    info = "\n".join([d.value["data"] for d in memories])
    system_msg = f"You are a helpful assistant."
    if info:
        system_msg += f" User info:\n{info}"
    print("System Message:", system_msg)
    messages = state["messages"]
    last_message = messages[-1]
    if "remember" in last_message.content.lower():
        memory_content = last_message.content.lower().split("remember", 1)[1].strip()
        if memory_content:
            memory = memory_content
            store.put(namespace, str(uuid.uuid4()), {"data": memory})
    model_input_messages = [SystemMessage(content=system_msg)] + messages
    response = model.invoke(model_input_messages)
    return {"messages": [response]}

def should_continue(state: MessagesState) -> Literal["tools", END]:
    messages = state['messages']
    last_message = messages[-1]
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        return "tools"
    return END

In [10]:
checkpointer = MemorySaver()
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
tool_node = ToolNode(tools)
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
)
workflow.add_edge("tools", 'agent')
graph = workflow.compile(checkpointer=checkpointer, store=store)

### Get Information from across multiple threads

In [11]:
graph.invoke(
    {"messages": [HumanMessage(content="Remember my name is Alice.")]},
    config={"configurable": {"user_id": "user123", "thread_id": 1}}
)

In [12]:
graph.invoke(
    {"messages": [HumanMessage(content="What is my name?")]},
    config={"configurable": {"user_id": "user123", "thread_id": 2}}
)

### PostGres-Store for persistance

In [17]:
import psycopg2
from psycopg2.extras import Json
from typing import Iterable
from datetime import datetime, timezone
from langgraph.store.base import (
    BaseStore,
    GetOp,
    Item,
    ListNamespacesOp,
    Op,
    PutOp,
    Result,
    SearchOp,
)

class PostgresStore(BaseStore):
    def __init__(self, dsn: str):
        self.dsn = dsn
        self._ensure_schema()

    def _connect(self):
        return psycopg2.connect(self.dsn)

    def _ensure_schema(self):
        with self._connect() as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    """
                    CREATE TABLE IF NOT EXISTS kv_store (
                        namespace TEXT NOT NULL,
                        key TEXT NOT NULL,
                        value JSONB NOT NULL,
                        created_at TIMESTAMPTZ NOT NULL,
                        updated_at TIMESTAMPTZ NOT NULL,
                        PRIMARY KEY (namespace, key)
                    )
                    """
                )
                conn.commit()

    def batch(self, ops: Iterable[Op]) -> list[Result]:
        results = []
        with self._connect() as conn:
            with conn.cursor() as cursor:
                for op in ops:
                    if isinstance(op, GetOp):
                        cursor.execute(
                            """
                            SELECT value, created_at, updated_at
                            FROM kv_store
                            WHERE namespace = %s AND key = %s
                            """,
                            (".".join(op.namespace), op.key),
                        )
                        row = cursor.fetchone()
                        if row:
                            value, created_at, updated_at = row
                            results.append(
                                Item(
                                    value=value,
                                    key=op.key,
                                    namespace=op.namespace,
                                    created_at=created_at,
                                    updated_at=updated_at,
                                )
                            )
                        else:
                            results.append(None)

                    elif isinstance(op, SearchOp):
                        namespace_prefix = ".".join(op.namespace_prefix)
                        cursor.execute(
                            """
                            SELECT key, value, created_at, updated_at
                            FROM kv_store
                            WHERE namespace LIKE %s
                            LIMIT %s OFFSET %s
                            """,
                            (namespace_prefix + "%", op.limit, op.offset),
                        )
                        rows = cursor.fetchall()
                        results.append(
                            [
                                Item(
                                    value=row[1],
                                    key=row[0],
                                    namespace=tuple(namespace_prefix.split(".")),
                                    created_at=row[2],
                                    updated_at=row[3],
                                )
                                for row in rows
                            ]
                        )

                    elif isinstance(op, PutOp):
                        if op.value is None:
                            cursor.execute(
                                """
                                DELETE FROM kv_store
                                WHERE namespace = %s AND key = %s
                                """,
                                (".".join(op.namespace), op.key),
                            )
                        else:
                            cursor.execute(
                                """
                                INSERT INTO kv_store (namespace, key, value, created_at, updated_at)
                                VALUES (%s, %s, %s, %s, %s)
                                ON CONFLICT (namespace, key)
                                DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at
                                """,
                                (
                                    ".".join(op.namespace),
                                    op.key,
                                    Json(op.value),
                                    datetime.now(timezone.utc),
                                    datetime.now(timezone.utc),
                                ),
                            )
                        results.append(None)

                    elif isinstance(op, ListNamespacesOp):
                        cursor.execute(
                            """
                            SELECT DISTINCT namespace
                            FROM kv_store
                            LIMIT %s OFFSET %s
                            """,
                            (op.limit, op.offset),
                        )
                        rows = cursor.fetchall()
                        results.append([tuple(row[0].split(".")) for row in rows])

                conn.commit()
        return results

    async def abatch(self, ops: Iterable[Op]) -> list[Result]:
        return self.batch(ops)


In [None]:
dsn = "postgresql://postgres:postgres@localhost:5433/postgres"
postgres_store = PostgresStore(dsn)

In [15]:
graph = workflow.compile(checkpointer=checkpointer, store=postgres_store)

In [16]:
graph.invoke(
    {"messages": [HumanMessage(content="Remember my name is Alice.")]},
    config={"configurable": {"user_id": "user123", "thread_id": 3}}
)

graph.invoke(
    {"messages": [HumanMessage(content="What is my name?")]},
    config={"configurable": {"user_id": "user123", "thread_id": 4}}
)