# [실습] 그래프에 메모리 추가하기   

랭그래프의 메모리 기능을 통해, 이전의 실행 내용을 기억하도록 설정할 수 있습니다.   

세션 내부의 단기 기억에 해당하는 Checkpoint와, 장기 기억에 해당하는 Storage를 사용합니다.
<br><br>

**이번 실습은 GPU가 필요합니다. 오른쪽 탭의 "런타임 유형 변경"을 통해 T4 GPU로 체크하고 진행해 주세요!**






기본 라이브러리를 설치합니다.

In [None]:
!pip install langgraph langchain langchain_google_genai langchain_community langgraph-checkpoint-sqlite rich

API 키와 LLM을 구성합니다.

In [None]:
import os
os.environ['GOOGLE_API_KEY'] = 'AIxxx'

from langchain_core.rate_limiters import InMemoryRateLimiter
from langchain_google_genai import ChatGoogleGenerativeAI

# Gemini API는 분당 10개 요청으로 제한
# 즉, 초당 약 0.167개 요청 (10/60)
rate_limiter = InMemoryRateLimiter(
    requests_per_second=0.167,  # 분당 10개 요청
    check_every_n_seconds=0.1,  # 100ms마다 체크
    max_bucket_size=10,  # 최대 버스트 크기
)

llm = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash",
    rate_limiter=rate_limiter,
    # temperature
    # max_tokens

    thinking_budget = 500  # 추론(Reasoning) 토큰 길이 제한
)

중간 과정 확인을 위해, LangSmith를 연동합니다.
https://smith.langchain.com 에서 등록 후 작성합니다.

In [None]:
os.environ['LANGCHAIN_API_KEY'] = ''
os.environ['LANGCHAIN_PROJECT'] = 'LangGraph_FastCampus'
os.environ['LANGCHAIN_ENDPOINT'] = 'https://api.smith.langchain.com'
os.environ['LANGCHAIN_TRACING_V2']='true'

## Checkpoint(체크포인트)

체크포인트는 그래프 실행의 상태와 중간 단계를 저장하고 관리합니다.    

<br><br>체크포인트를 저장하기 위해 MemorySaver를 구성합니다.

In [None]:
# 메모리 세팅
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

구성된 체크포인터는 `compile`에 포함하여 실행하면 됩니다.

In [None]:
from typing import Annotated
from typing_extensions import TypedDict, List
from langgraph.graph.message import add_messages
from langgraph.graph import StateGraph, state, START

class State(TypedDict):
    messages: Annotated[list, add_messages]
    foo: str


def call_model(state: State):
    response = llm.invoke(state["messages"])
    return {"messages": response}


builder = StateGraph(State)

builder.add_node("call_model", call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=memory)

graph
# memory에 기록 저장

체크포인터에서, 각각의 실행은 `thread`라는 이름으로 구분됩니다.   
thread_id를 활용해, 이전 출력의 정보를 저장할 수 있습니다.

In [None]:
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

# thread_id를 통해 매 실행의 체크포인트 저장
config = {"configurable": {"thread_id": "1"}}


input_message = HumanMessage(input())

for chunk in graph.stream({"messages": [input_message]},
                          config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

In [None]:
input_message = {"role": "user", "content": "내가 뭐라고 했지?"}
for chunk in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

해당 체크포인트의 내용은 `thread_id`에 귀속됩니다.

In [None]:
config = {"configurable": {"thread_id": "2"}}

input_message = {"role": "user", "content": "내가 뭐라고 했지?"}
for chunk in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

`get_state`를 통해 진행된 스레드의 마지막 상태를 볼 수 있습니다.   
해당 상태에는 State의 스냅샷 정보가 저장되어 있습니다.

In [None]:
from rich import print as rprint
config = {"configurable": {"thread_id": "1"}}

rprint(graph.get_state(config))

`get_state_history()`는 전체 스냅샷 목록을 순서대로 보여줍니다.

In [None]:
history = list(graph.get_state_history(config))
history

In [None]:
for i in range(len(history)):
    print(history[i].config, len(history[i].values['messages']))
    # 메시지 개수와 config 정보

## Replay

위에서 얻은 Snapshot을 활용하면, 특정 시점부터 시작하는 것도 가능합니다.   

In [None]:
history[1].values

HumanMessage의 특정 시점부터 재시작합니다.

In [None]:
graph.invoke(None, config = history[1].config)

Replay를 수행하면, graph의 history에는 해당 실행 결과가 추가됩니다.

In [None]:
history = list(graph.get_state_history(config))
history

특정 스냅샷의 값을 수정할 수도 있습니다. 이 경우, Reducer의 정의에 맞게 처리됩니다.

In [None]:
graph.update_state(config = history[0].config, values= {'messages':'저녁 뭐 먹을까?'})

In [None]:
history = list(graph.get_state_history(config))
rprint(history[0])

In [None]:
graph.invoke(None, config = history[0].config)

## SQLite를 이용한 Checkpointing

`SqliteSaver`를 사용해, 체크포인팅을 저장할 수 있습니다.

In [None]:
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3

conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
checkpointer = SqliteSaver(conn)

graph = builder.compile(checkpointer=checkpointer)


In [None]:
config = {"configurable": {"thread_id": "1"}}


input_message = HumanMessage(input())

for chunk in graph.stream({"messages": [input_message]},
                          config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

In [None]:
input_message = {"role": "user", "content": "오늘 저녁은 뭐 먹을까?"}
for chunk in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

## Store를 이용한 장기 메모리 추가하기   

Thread Checkpoint는 단일 세션의 메모리를 저장하기에는 효과적이지만,   
세션이 유지되지 않는 경우에는 다른 방법이 필요합니다.   

InMemoryStore를 통해, 장기 메모리를 추가할 수 있습니다.


In [None]:
from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()

Namespace를 통해 메모리 위치를 설정합니다.

In [None]:
namespace_for_memory = ('1', 'memories')

In [None]:
import uuid
memory_id = str(uuid.uuid4())

memory = {'information' : "지금은 랭그래프 메모리 실습 중입니다."}
in_memory_store.put(namespace_for_memory, memory_id, memory)
memories = in_memory_store.search(namespace_for_memory)
memories

In [None]:
memory = {'name' : "FastCampus"}
memory_id = str(uuid.uuid4())

in_memory_store.put(namespace_for_memory, memory_id, memory)
# memory_id에 매핑되는 구조

memories = in_memory_store.search(namespace_for_memory)
memories


In [None]:
# 메모리 레코드 추가하기
memory_id = str(uuid.uuid4())
new_memory  = {'name' : "LangGraph"}

in_memory_store.put(namespace_for_memory, memory_id, new_memory)
# memory_id에 매핑되는 구조

memories = in_memory_store.search(namespace_for_memory)
memories

In [None]:
# 메모리 덮어씌우기
new_memory  = {'name' : "패스트캠퍼스"}

in_memory_store.put(namespace_for_memory, memory_id, new_memory)
# memory_id에 매핑되는 구조

memories = in_memory_store.search(namespace_for_memory)
memories

In [None]:
memories[-1].dict()

## memory 검색    
메모리에 임베딩 모델을 설정하면 임베딩 기반의 인덱싱/검색이 가능합니다.    
주로 사용할 수 있는 다국어 임베딩은 다음과 같습니다.   
1. 온라인/ OpenAI의 `text-embedding-3-small`, `text-embedding-3-large` (유료)
2. 온라인/ Google `gemini-embedding-exp-03-07` (유료, 무료 제한 1일 100개)
3. 오프라인/ HuggingFace에서 지원하는 `bge-m3`, `multilingual-e5`, `KURE-v1`    

이번 실습에서는 가장 작은 임베딩 모델 중 하나인 `text-embedding-3-small`를 사용합니다.   

**실제 임베딩 모델을 선정하실 때는 GPU 자원에 따라 MTEB 리더보드 (https://huggingface.co/spaces/mteb/leaderboard) 에서 한국어 성능 순위를 참고하시는 것을 추천합니다!**

In [None]:
!pip install langchain-huggingface sentence-transformers

HuggingFace에서 임베딩을 다운로드하여 로컬 폴더에 저장합니다.

In [None]:
from sentence_transformers import SentenceTransformer

# HuggingFace 임베딩 주소 지정하기
# nlpai-lab/KURE-v1 , baai/bge-m3, 등의 주소를 입력하여 지정

model_name = 'intfloat/multilingual-e5-small'

# CPU 설정으로 모델 불러오기
# device='cuda'로 변경하면 GPU에서 로드
emb_model = SentenceTransformer(model_name, device='cpu')

# 로컬 폴더에 모델 저장하기
emb_model.save('./embedding')
del emb_model

import gc
gc.collect()
# 메모리 정리하기

In [None]:
from langchain_huggingface import HuggingFaceEmbeddings

# 허깅페이스 포맷의 임베딩 모델 불러오기
embeddings = HuggingFaceEmbeddings(model_name= './embedding',
                                   model_kwargs={'device':'cuda'})
# GPU가 있는 경우, CUDA(GPU) 설정

In [None]:
store = InMemoryStore(
    index={
        "embed": embeddings,
        "dims": 1024, # 임베딩 차원(모델마다 다름)
        "fields": ["comments"] # 임베딩을 활용해 저장할 필드 지정
    }
)

In [None]:
memory_id

In [None]:
namespace_for_memory = ('1', 'information')
memory_id = str(uuid.uuid4())

store.put(namespace_for_memory,
          memory_id,
          {'ID':'JohnDoe1', 'comments':'안녕하세요! 저는 John Doe이고, 23살입니다.'})

store.search(namespace_for_memory, limit=20)

In [None]:
def add_records(store, namespace, data_records):
    for data in data_records:
        store.put(namespace, str(uuid.uuid4()), data)

example_data = [
    {'Hello':'world'},
    {'ID':'Admin', 'comments':'이 계정은 운영 전용 계정입니다.'},
    {'ID':'GPT_Agent1', 'comments':'GPT-4o Agent 전용 계정'},
    {'ID':'Gemini_Agent1', 'comments':'Gemini-2.0-flash Agent 전용 계정'}
]

add_records(store,namespace_for_memory, example_data)


history = store.search(namespace_for_memory)
history

In [None]:
print('\n'.join(['['+str(i.created_at)[0:10] + '] ' + i.value['comments'] if 'comments' in i.value else '' for i in history]))

In [None]:
result = store.search(
    namespace_for_memory,
    query = 'Gemini Agent 버전은 뭔가요?',
    limit = 2
)
result

In [None]:
type(result[0])

이렇게 생성된 store는 각 Node에 State와 함께 포함할 수 있습니다.  
실제 채팅 시나리오를 통해 구현해 보겠습니다.

In [None]:
from langgraph.checkpoint.sqlite import SqliteSaver


memory = MemorySaver()
in_memory_store = InMemoryStore()

In [None]:
from typing import Annotated
from typing_extensions import TypedDict, List
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, RemoveMessage
from langgraph.graph.message import add_messages
from langgraph.graph import StateGraph, state, START, END
from langgraph.store.base import BaseStore
from langchain_core.runnables import RunnableConfig


class State(TypedDict):
    messages: Annotated[list, add_messages]
    summary: str



def call_model(state: State, config: RunnableConfig, store: BaseStore):

    user_id = config['configurable']['user_id']
    namespace = (user_id, "information")

    def gen_memories(history):
        return '\n'.join(['['+str(i.created_at)[0:10] + '] ' + i.value['memory'] if 'memory' in i.value else '' for i in history])

    summary = gen_memories(store.search(namespace))

    prompt = [SystemMessage(content=f"""
당신은 고객 상담 챗봇입니다.
고객은 이전에 아래의 상담을 한 적이 있습니다. 과거 내역을 참고하여 응답하세요.
첫 대화는 친절한 인사로 시작하세요.
---
Previous Conversation Summary: {summary}"""), HumanMessage(content="안녕하세요!")] + state['messages']

     # 과거 정보를 상위 Context에 포함하기
    response = llm.invoke(prompt)
    print('Assistant:', response.content)

    user_message = HumanMessage(content = input())


    return {"messages": [response, user_message]}

def summarize_conversation(state: State, config: RunnableConfig, store: BaseStore):

    summary_prompt = """현재까지의 대화 내용을 요약하여 데이터베이스에 저장하려고 합니다.
이전의 Summary 내용과 이어지는 경우, 해당 내용을 포함하여 요약하세요.
만약 새로운 주제인 경우, 새로운 대화 내용만 요약하세요.
대화 내용을 3~5문장 길이로 요약하세요. 요약 이외의 다른 내용은 출력하지 마세요.
또한, 요약을 요청했다는 사실도 작성하지 마세요.
"""
    prompt = state['messages'][:-1] + [HumanMessage(content = summary_prompt)]
    # END 제외

    response = llm.invoke(prompt)

    delete_messages = [RemoveMessage(id=m.id) for m in state['messages']]
    return {'summary': response.content, 'messages':delete_messages}



def update_memory(state:State, config: RunnableConfig, store:BaseStore):

    user_id = config['configurable']['user_id']
    namespace = (user_id, "information")

    memory_id = str(uuid.uuid4())


    memory = state["summary"]

    store.put(namespace, memory_id, {'memory':memory})

def check_end(state:State):
    last_msg = state["messages"][-1]

    if last_msg.content.upper()=='END':
        return 'END'
    return 'RESUME'

builder = StateGraph(State)


builder.add_node('update_memory', update_memory)
builder.add_node("call_model", call_model)
builder.add_node('summarize_conversation', summarize_conversation)

builder.add_edge(START, "call_model")
builder.add_conditional_edges('call_model',
                              check_end,
                              {'END':'summarize_conversation',
                               'RESUME':'call_model'})
builder.add_edge('summarize_conversation','update_memory')
builder.add_edge('update_memory', END)

graph = builder.compile(checkpointer=memory, store=in_memory_store)

graph
# memory에 기록 저장

In [None]:
print('END를 입력하면 대화가 종료됩니다.')

config = {'configurable':{'thread_id':'2', 'user_id':'admin'}}
for data in graph.stream(
    {'messages':[]},
    config,stream_mode="updates"):
    print(data)

In [None]:
config = {'configurable':{'thread_id':'8', 'user_id':'admin'}}
graph.invoke({'messages':[]}, config)

In [None]:
graph.get_state(config=config)

In [None]:
in_memory_store.list_namespaces()

In [None]:
in_memory_store.search(('HH', 'information'))

In [None]:
# 다른 thread id로 해도 이전 정보를 참고함
config = {'configurable':{'thread_id':'9', 'user_id':'admin'}}
for data in graph.stream(
    {'messages':[]},
    config,stream_mode="updates"):
    print(data)

# SQLite Store 사용하기

외부 DB를 연동하는 기능은 현재 직접 지원하지 않으며, BaseStore Class를 이용해 구현해야 합니다.   
PostGresStore를 사용하는 경우: https://langchain-ai.github.io/langgraph/reference/store/#langgraph.store.postgres.PostgresStore

In [None]:
from langgraph.store.base import BaseStore, SearchItem
from typing import List, Optional, Dict, Any, Tuple
import sqlite3
import json
from datetime import datetime
import uuid
import numpy as np
from sentence_transformers import SentenceTransformer

class SQLiteStore(BaseStore):
    def __init__(self, db_path: str = "store.db", embedding_model: str = 'intfloat/multilingual-e5-large'):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self.embedding_model = embedding_model
        self._setup()

    def _setup(self):
        """데이터베이스 테이블 생성"""
        cursor = self.conn.cursor()

        # 메인 스토어 테이블
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS store (
                namespace TEXT,
                key TEXT,
                value TEXT,
                created_at TEXT,
                updated_at TEXT,
                PRIMARY KEY (namespace, key)
            )
        """)

        # 임베딩 테이블
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS embeddings (
                namespace TEXT,
                key TEXT,
                embedding BLOB,
                text TEXT,
                PRIMARY KEY (namespace, key),
                FOREIGN KEY (namespace, key) REFERENCES store(namespace, key)
            )
        """)

        self.conn.commit()

    def _get_embedding(self, text: str) -> np.ndarray:
        """텍스트의 임베딩 벡터 생성"""
        model = SentenceTransformer(self.embedding_model)
        return model.encode(text)

    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        """코사인 유사도 계산"""
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))


    def batch(self, operations: List[Tuple[str, Tuple[str, ...], str, Dict[str, Any]]]) -> None:
        """일괄 작업 처리"""
        for op, namespace, key, value in operations:
            if op == "put":
                self.put(namespace, key, value)
            elif op == "delete":
                self.delete(namespace, key)
    async def abatch(self, operations: List[Tuple[str, Tuple[str, ...], str, Dict[str, Any]]]) -> None:
        """비동기 일괄 작업 처리"""
        self.batch(operations)  # SQLite는 기본적으로 동기식이므로 동기 메서드를 호출




    def put(self, namespace: Tuple[str, ...], key: str, value: Dict[str, Any], index: bool = True) -> None:
        """데이터 저장"""
        namespace_str = json.dumps(namespace)
        value_str = json.dumps(value)
        now = datetime.utcnow().isoformat()

        cursor = self.conn.cursor()
        cursor.execute("""
            INSERT OR REPLACE INTO store (namespace, key, value, created_at, updated_at)
            VALUES (?, ?, ?, ?, ?)
        """, (namespace_str, key, value_str, now, now))

        # 임베딩 저장
        if index:
            # value에서 텍스트 추출 (예: 'memory' 필드)
            text_to_embed = value.get('memory', '') if isinstance(value, dict) else str(value)
            if text_to_embed:
                embedding = self._get_embedding(text_to_embed)
                cursor.execute("""
                    INSERT OR REPLACE INTO embeddings (namespace, key, embedding, text)
                    VALUES (?, ?, ?, ?)
                """, (namespace_str, key, embedding.tobytes(), text_to_embed))

        self.conn.commit()

    def get(self, namespace: Tuple[str, ...], key: str) -> Optional[Dict[str, Any]]:
        """데이터 조회"""
        namespace_str = json.dumps(namespace)
        cursor = self.conn.cursor()
        cursor.execute("""
            SELECT value FROM store WHERE namespace = ? AND key = ?
        """, (namespace_str, key))
        result = cursor.fetchone()

        if result:
            return json.loads(result[0])
        return None

    def search(self, namespace: Tuple[str, ...], query: Optional[str] = None, limit: int = 20) -> List[SearchItem]:
        """데이터 검색 (임베딩 기반 또는 텍스트 기반)"""
        namespace_str = json.dumps(namespace)
        cursor = self.conn.cursor()

        if query:
            # 임베딩 기반 검색
            query_embedding = self._get_embedding(query)

            # 모든 임베딩 가져오기
            cursor.execute("""
                SELECT e.embedding, e.text, s.key, s.value, s.created_at, s.updated_at
                FROM embeddings e
                JOIN store s ON e.namespace = s.namespace AND e.key = s.key
                WHERE e.namespace = ?
            """, (namespace_str,))

            results = []
            for row in cursor.fetchall():
                embedding = np.frombuffer(row[0], dtype=np.float32)
                similarity = self._cosine_similarity(query_embedding, embedding)
                results.append(SearchItem(
                key=row[2],          # key는 세 번째 컬럼
                value=json.loads(row[3]),  # value는 네 번째 컬럼
                created_at=datetime.fromisoformat(row[4]),  # created_at은 다섯 번째 컬럼
                updated_at=datetime.fromisoformat(row[5]),  # updated_at은 여섯 번째 컬럼
                namespace=namespace,
                score = similarity
            ))

            # 유사도 기준으로 정렬
            results.sort(key=lambda x: x.score, reverse=True)
            return results[:limit]
        else:
            # 일반 텍스트 검색
            cursor.execute("""
                SELECT key, value, created_at, updated_at
                FROM store
                WHERE namespace = ?
                ORDER BY created_at DESC LIMIT ?
            """, (namespace_str, limit))

            results = []
            for row in cursor.fetchall():
                value = json.loads(row[1])
                results.append(SearchItem(
                key=row[0],
                value=value,
                created_at=datetime.fromisoformat(row[2]),
                updated_at=datetime.fromisoformat(row[3]),
                namespace=namespace  # 원래 search 메서드에 전달된 namespace 사용
                ))

            return results

    def delete(self, namespace: Tuple[str, ...], key: str) -> None:
        """데이터 삭제"""
        namespace_str = json.dumps(namespace)
        cursor = self.conn.cursor()

        # 임베딩도 함께 삭제
        cursor.execute("""
            DELETE FROM embeddings WHERE namespace = ? AND key = ?
        """, (namespace_str, key))

        cursor.execute("""
            DELETE FROM store WHERE namespace = ? AND key = ?
        """, (namespace_str, key))

        self.conn.commit()

    def list_namespaces(self) -> List[Tuple[str, ...]]:
        """네임스페이스 목록 조회"""
        cursor = self.conn.cursor()
        cursor.execute("SELECT DISTINCT namespace FROM store")
        return [tuple(json.loads(row[0])) for row in cursor.fetchall()]

In [None]:
graph

In [None]:
# SQLite 스토어 인스턴스 생성
sqlite_store = SQLiteStore(db_path = 'example.db', embedding_model = model_name)

graph = builder.compile(checkpointer=memory, store=sqlite_store)

print('END를 입력하면 대화가 종료됩니다.')

config = {'configurable':{'thread_id':'1', 'user_id':'admin'}}
for data in graph.stream(
    {'messages':[]},
    config,stream_mode="updates"):
    print(data)

In [None]:
sqlite_store.list_namespaces()

In [None]:
memories = sqlite_store.search(('admin', 'information'))
for result in memories:
    print(result)

In [None]:
# 임베딩 검색 사용 예시
# memory에서 값을 검색하도록 설정(SQLite Store Class 참고)

results = sqlite_store.search(('admin', 'information'), query="코딩", limit=1)
for result in results:
    print(result)


Langsmith (https://smith.langchain.com )에서 실행 결과를 확인할 수 있습니다.