# Memory Store 使用指南

# 1. 初始化


In [3]:
import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os

endpoint = os.getenv("tablestore_end_point")
instance_name = os.getenv("tablestore_instance_name")
access_key_id = os.getenv("tablestore_access_key_id")
access_key_secret = os.getenv("tablestore_access_key_secret")

# 创建 tablestore 的 sdk client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# 根据 Session 的更新时间 进行 list_recent_sessions 时候，需要返回哪些字段?
session_secondary_index_meta = {
    "meta_string": MetaType.STRING,
    "meta_long": MetaType.INTEGER,
    "meta_double": MetaType.DOUBLE,
    "meta_boolean": MetaType.BOOLEAN,
    "meta_bytes": MetaType.BINARY,
}

# session 表的 多元索引的meta信息，能进行meta的多字段自由组合查询。例如搜索session的title
# 如果无需该功能，不需要传该参数
session_search_index_schema = [
    tablestore.FieldSchema(
        "title",
        tablestore.FieldType.TEXT,
        analyzer=tablestore.AnalyzerType.FUZZY,
        analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
    ),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# message 表的 多元索引的 meta 信息，能进行 meta 的多字段自由组合查询。例如搜索 message 的 content 中包含“你好”的消息记录
# content 字段默认会创建索引，这里仅需设置meta字段即可。
# 如果无需该功能，不需要传该参数
message_search_index_schema = [
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

memory_store = MemoryStore(
    tablestore_client=tablestore_client,
    session_secondary_index_meta=session_secondary_index_meta,
    session_search_index_schema=session_search_index_schema,
    message_search_index_schema=message_search_index_schema,
)

In [4]:
# 创建表(包括二级索引)，能完成 Memory 场景的核心能力
# 仅需执行一次
memory_store.init_table()

# 创建多元索引，能进行meta的多字段自由组合查询，需要在上面声明一些meta字段。后续索引结构可以在控制台上修改。
memory_store.init_search_index()

In [3]:
# 删除表（内部接口，仅供测试使用）
memory_store._delete_table()

# 2. Session 会话管理

一个Session属于一个User，一个User有n个Session，每个Session有n个聊天消息Message，本章节主要介绍Session的管理

## 2.1 声明 Session

In [5]:
from tablestore_for_agent_memory.base.common import microseconds_timestamp
from tablestore_for_agent_memory.base.base_memory_store import Session

# 声明一个Session（user_id和session_id联合起来确认唯一一个session）,推荐使用uuid当做session_id
session = Session(user_id="1", session_id="2")
# 指定Session的更新时间(可以不写，默认是微妙时间戳)
session.update_time = microseconds_timestamp()
# 给这个Session附加一些meta信息,支持的类型包括以下:
session.metadata["meta_string"] = "4"
session.metadata["meta_long"] = 5
session.metadata["meta_double"] = 6.6
session.metadata["meta_boolean"] = True
session.metadata["meta_bytes"] = bytearray("China", encoding="utf8")

## 2.2 创建、更新、查询、删除 Session


In [6]:
# 覆盖写
memory_store.put_session(session)
# 查询
session = memory_store.get_session(user_id="1", session_id="2")
print(session)

Session(user_id='1', session_id='2', update_time=1744869710786900, metadata={'meta_boolean': True, 'meta_bytes': bytearray(b'China'), 'meta_double': 6.6, 'meta_long': 5, 'meta_string': '4'})


In [7]:
session.metadata["meta_string"] = "updated"
# 增量更新(不存在则创建新的，普通写入推荐使用put_session)
memory_store.update_session(session)
print(memory_store.get_session(user_id="1", session_id="2"))

Session(user_id='1', session_id='2', update_time=1744869710786900, metadata={'meta_boolean': True, 'meta_bytes': bytearray(b'China'), 'meta_double': 6.6, 'meta_long': 5, 'meta_string': 'updated'})


In [8]:
# 删除
memory_store.delete_session(user_id="1", session_id="2")
print(memory_store.get_session(user_id="1", session_id="2"))

None


## 2.3 展示最近活跃的Session

根据 session 更新时间`session.update_time` 展示最近活跃列表。

### 2.3.1 写入样例数据
 

In [31]:
from faker import Faker
import random

fk_data = Faker(locale="zh_CN")


def random_session(user_id=fk_data.user_name()) -> Session:
    session = Session(user_id=user_id, session_id=fk_data.uuid4())
    session.metadata["title"] = random.choice(["abc", "def", "ghi", "abcd", "abcdef", "abcgh"])
    session.metadata["meta_string"] = fk_data.name()
    session.metadata["meta_long"] = random.randint(0, 9999999999999)
    session.metadata["meta_double"] = random.uniform(0.0, 1.0)
    session.metadata["meta_boolean"] = random.choice([True, False])
    session.metadata["meta_bytes"] = bytearray(fk_data.name(), encoding="utf8")
    return session


total_count = 100
for i in range(total_count):
    user_id = random.choice(["1", "2"])
    session = random_session(user_id=user_id)
    session.update_time = random.randint(1, 100)
    memory_store.put_session(session)

### 2.3.2 展示某一个用户最近的活跃会话列表


In [None]:
# 方式1：接口返回 iterator 迭代器
iterator = memory_store.list_recent_sessions(user_id="1")
for session in iterator:
    print(session)

In [2]:
# 方式2：token分页返回数据（该token可以传递给前端，方便进行按批次分页查询）
response = memory_store.list_recent_sessions_paginated(user_id="1", page_size=10, next_token=None)
print(len(response.hits))
print(response.next_token)
# 翻页查询（使用上一次结果里返回的token，传递给下次使用）
response = memory_store.list_recent_sessions_paginated(user_id="1", page_size=10, next_token=response.next_token)
print(len(response.hits))
print(response.next_token)
# 后续循环此方式，直到token为None表示查询结束。

10
W1sidXNlcl9pZCIsICIxIl0sIFsidXBkYXRlX3RpbWUiLCA3NF0sIFsic2Vzc2lvbl9pZCIsICJlYTk4OTdmNy00NTQ2LTQ3NmEtOWFmNi1iYWM0YmZlMzFkMDUiXV0=
10
W1sidXNlcl9pZCIsICIxIl0sIFsidXBkYXRlX3RpbWUiLCA1MF0sIFsic2Vzc2lvbl9pZCIsICI1OWYyMTJkMi1lNTMxLTQ0MWItODU3ZC00ODAyM2JiMTNmODYiXV0=


In [12]:
# 将 iterator 迭代器转成list使用
sessions = list(memory_store.list_recent_sessions(user_id="1"))
print(len(sessions))

# 仅获取其前 5 个值。推荐使用该方法，因为用户量大了，列出所有session会越来越慢，因此仅需列出其最活跃的即可。
sessions = list(memory_store.list_recent_sessions(user_id="1", max_count=5))
print(len(sessions))

# 获取前 5 个
sessions = list(memory_store.list_recent_sessions(user_id="1", max_count=5))
print(len(sessions))

57
5
5


### 2.3.3 根据 metadata 进行 filter

In [13]:
from tablestore_for_agent_memory.base.filter import Filters

# 使用 单个 Filter 进行过滤
sessions = list(memory_store.list_recent_sessions(user_id="1", metadata_filter=Filters.eq("meta_boolean", True)))
print(len(sessions))

26


In [14]:
from tablestore_for_agent_memory.base.filter import Filters

# 使用 多个 Filter 进行过滤
sessions = list(
    memory_store.list_recent_sessions(
        user_id="1",
        metadata_filter=Filters.logical_and([Filters.gt("meta_double", 0.5), Filters.eq("meta_boolean", True)]),
    )
)
print(len(sessions))

13


### 2.3.4 设置查询Session的截止时间

In [15]:
sessions = list(
    memory_store.list_recent_sessions(
        user_id="1",
        inclusive_end_update_time=50,
        metadata_filter=Filters.eq("meta_boolean", True),
    )
)
print(len(sessions))
print([item.update_time for item in sessions])

16
[97, 95, 94, 93, 89, 87, 87, 69, 67, 61, 58, 56, 55, 55, 52, 51]


## 2.4 Search 搜索 Meta 字段


In [16]:
all_sessions = []
# 查询 user_id=1的所有记录，使用 next_token 进行翻页
response = memory_store.search_sessions(metadata_filter=Filters.eq("user_id", "1"), limit=3)
sessions, next_token = (response.hits, response.next_token)
all_sessions.extend(sessions)
print(len(sessions))
print(next_token)
# 连续翻页示例
next_token = None
while True:
    response = memory_store.search_sessions(metadata_filter=Filters.eq("user_id", "1"), limit=3, next_token=next_token)
    sessions, next_token = (response.hits, response.next_token)
    all_sessions.extend(sessions)
    # 当 next_token 为 None 时候，表示翻页结束
    if next_token is None:
        break
print("all_sessions:", len(all_sessions))

3
CAESBgoEIgIIABgAIlUKUwNOAAAAMVMzMS5TMzE2MjM4Mzg2MzMwNjE2NDJkNjEzNzM2NjIyZDM0MzM2MjMyMmQ2MjM3MzczNzJkNjIzMzMxMzU2NTYzNjMzNzMzMzUzNTY0
all_sessions: 60


In [17]:
# 全文检索 session 的 title 字段，需要包含 "ab". 如果需要翻页，可以将next_token传给前端，以便下次进行连续翻页查询，请参考上述的 while true 示例。
response = memory_store.search_sessions(
    metadata_filter=Filters.logical_and([Filters.eq("user_id", "1"), Filters.text_match_phrase("title", "ab")]),
    limit=100,
)
sessions, next_token = (response.hits, response.next_token)
print(len(sessions))

39


# 3. Message 聊天消息

一个Session有n个聊天消息Message，本章节主要介绍Message的使用。

## 3.1 声明 Message


In [18]:
from faker import Faker

fk_data = Faker(locale="zh_CN")

from tablestore_for_agent_memory.base.base_memory_store import Message
import random

# 创建一条message（session_id 和 message_id 联合起来确认唯一一行数据）
message = Message(
    session_id="推荐uuid当做session_id",
    message_id="可以使用uuid或其他业务string当做message_id",
    create_time=random.randint(
        0, 999999999
    ),  # create_time 不可变，用来确认这条消息在一个会话(session)里的位置信息(或者叫排序信息)
)

message.content = fk_data.text(20)
message.metadata["meta_string"] = fk_data.name_male()
message.metadata["meta_long"] = random.randint(0, 999999999)
message.metadata["meta_double"] = random.uniform(1.0, 2.0)
message.metadata["meta_boolean"] = random.choice([True, False])
message.metadata["meta_bytes"] = bytearray(fk_data.city_name(), encoding="utf8")

## 3.2 写入、查询、更新、删除一条聊天记录

In [19]:
# 写入
memory_store.put_message(message)

# 查询 (不指定create_time)
message_read = memory_store.get_message(session_id=message.session_id, message_id=message.message_id)
print(message_read)

# 查询 (指定create_time，性能更好，查单条message场景的QPS较高的用户可以传。因该领域大多数业务查询单条聊天消息的场景或qps很少，所以不带也没关系，大多数用户的场景是list出来一批最近的聊天消息，而不是查询单条)
message_read = memory_store.get_message(
    session_id=message.session_id, message_id=message.message_id, create_time=message.create_time
)
print(message_read)

# 更新
message.content = "update to: 123"
memory_store.update_message(message)
message_read_after_update = memory_store.get_message(
    session_id=message.session_id, message_id=message.message_id, create_time=message.create_time
)
print(message_read_after_update)

# 删除 (不指定create_time)
memory_store.delete_message(session_id=message.session_id, message_id=message.message_id)
# 删除（(指定create_time，性能更好，原因参考上述查询接口描述）
memory_store.delete_message(
    session_id=message.session_id, message_id=message.message_id, create_time=message.create_time
)
# 删除后查询不到
print(memory_store.get_message(session_id=message.session_id, message_id=message.message_id))

Message(session_id='推荐uuid当做session_id', message_id='可以使用uuid或其他业务string当做message_id', create_time=783174296, content='成功政府地方的是类别汽车资料状态他的.', metadata={'meta_boolean': False, 'meta_bytes': bytearray(b'\xe8\xbe\x9b\xe9\x9b\x86'), 'meta_double': 1.9802964476651883, 'meta_long': 584474057, 'meta_string': '萧海燕'})
Message(session_id='推荐uuid当做session_id', message_id='可以使用uuid或其他业务string当做message_id', create_time=783174296, content='成功政府地方的是类别汽车资料状态他的.', metadata={'meta_boolean': False, 'meta_bytes': bytearray(b'\xe8\xbe\x9b\xe9\x9b\x86'), 'meta_double': 1.9802964476651883, 'meta_long': 584474057, 'meta_string': '萧海燕'})
Message(session_id='推荐uuid当做session_id', message_id='可以使用uuid或其他业务string当做message_id', create_time=783174296, content='update to: 123', metadata={'meta_boolean': False, 'meta_bytes': bytearray(b'\xe8\xbe\x9b\xe9\x9b\x86'), 'meta_double': 1.9802964476651883, 'meta_long': 584474057, 'meta_string': '萧海燕'})
None


## 3.3 获取某一个session会话的聊天消息

### 3.3.1 写入样例数据

In [20]:
from faker import Faker
import random

fk_data = Faker(locale="zh_CN")


def random_message(session_id=fk_data.user_name()) -> Message:
    message = Message(session_id=session_id, message_id=fk_data.uuid4())
    message.create_time = random.randint(0, 999999999)
    message.content = " ".join(
        random.choices(
            ["abc", "def", "ghi", "abcd", "adef", "abcgh", "apple", "banana", "cherry"], k=random.randint(1, 10)
        )
    )
    message.metadata["meta_string"] = fk_data.name_male()
    message.metadata["meta_long"] = random.randint(0, 999999999)
    message.metadata["meta_double"] = random.uniform(1.0, 2.0)
    message.metadata["meta_boolean"] = random.choice([True, False])
    message.metadata["meta_bytes"] = bytearray(fk_data.city_name(), encoding="utf8")
    return message


total_count = 50
for i in range(total_count):
    session_id = random.choice(["1", "2"])
    message = random_message(session_id=session_id)
    message.create_time = random.randint(1, 100)
    memory_store.put_message(message)


### 3.3.2 获取所有消息
根据业务方的经验，在大模型聊天场景下，越早的消息其实不太需要的，仅需要最近的个别几条即可，因此更推荐下一小章节的“获取最近的部分消息”。

In [21]:
# 获取某一个session_id的全部聊天消息（默认根据 create_time 从最新到旧排序，接口返回 iterator 迭代器）
iterator = memory_store.list_messages(session_id="1")
# 将 iterator 迭代器 转为list使用
messages = list(iterator)
print(len(messages))

23


In [22]:
from tablestore_for_agent_memory.base.filter import Filters

# 根据 metadata Filter 获取某一个session的全部数据
messages = list(
    memory_store.list_messages(
        session_id="1",
        metadata_filter=Filters.logical_and([Filters.gt("meta_double", 0.5), Filters.eq("meta_boolean", True)]),
    )
)
print(len(messages))

12


In [23]:
# 所有session的所有message（注意，量可能较大，请使用迭代器访问，不要放到一个list里）
iterator = memory_store.list_all_messages()

### 3.3.3 获取最近的部分消息
#### 3.3.3.1 获取最近的 3 条消息

In [24]:
iterator = memory_store.list_messages(session_id="1", max_count=3)
messages = list(iterator)
print(len(messages))

# 根据 metadata 过滤，返回最近的3条消息
messages = list(
    memory_store.list_messages(session_id="1", metadata_filter=Filters.eq("meta_boolean", True), max_count=3)
)
print(len(messages))

3
3



#### 3.3.3.2 按照时间获取


In [25]:
from tablestore_for_agent_memory.base.common import Order

# 获取 session_id=“1” 且创建时间在10~50范围内的数据，排序方式为正序(create_time 从最旧到新)
messages = list(
    memory_store.list_messages(
        session_id="1",
        inclusive_start_create_time=10,
        inclusive_end_create_time=50,
        order=Order.ASC,
    )
)
print(len(messages))

# 获取 session_id=“1” 且创建时间在10~80范围内的数据，排序方式为逆序(create_time 从最新到旧)，因此起始时间是80，结束时间是10.
messages = list(
    memory_store.list_messages(
        session_id="1",
        inclusive_start_create_time=80,
        inclusive_end_create_time=10,
        order=Order.DESC,
    )
)
print(len(messages))

# 获取 session_id=“1” 且创建时间在10~80范围内的数据，排序方式为逆序(create_time 从最新到旧),同时指定 metadata filter
messages = list(
    memory_store.list_messages(
        session_id="1",
        inclusive_start_create_time=80,
        inclusive_end_create_time=10,
        order=Order.DESC,
        metadata_filter=Filters.logical_and([Filters.gt("meta_double", 0.5), Filters.eq("meta_boolean", True)]),
    )
)
print(len(messages))

13
17
9


## 3.4 Search 搜索 Meta 信息

In [26]:
all_messages = []
# 查询 session_id=1的所有记录，使用 next_token 进行翻页
response = memory_store.search_messages(metadata_filter=Filters.eq("session_id", "1"), limit=3)
messages, next_token = (response.hits, response.next_token)
all_messages.extend(messages)
print(len(messages))
print(next_token)
# 连续翻页示例
next_token = None
while True:
    response = memory_store.search_sessions(metadata_filter=Filters.eq("user_id", "1"), limit=3, next_token=next_token)
    messages, next_token = (response.hits, response.next_token)
    all_messages.extend(messages)
    # 当 next_token 为 None 时候，表示翻页结束
    if next_token is None:
        break
print("all_messages:", len(all_messages))

3
CAESBgoEIgIIABgAImcKZQNgAAAAMVMzMS5MODAwMDAwMDAwMDAwMDAwNy5TNjUzMDM5MzE2NDMwNjE2MTJkNjUzNDMzNjQyZDM0MzIzNjYxMmQzOTM5MzA2MjJkMzEzMTM2NjQzNDYyNjQ2MjYxMzE2MTYx
all_messages: 60


In [27]:
# 全文检索 message 的 content 字段，需要包含 "abc". 如果需要翻页，可以将next_token传给前端，以便下次进行连续翻页查询，请参考上述的 while true 示例。
response = memory_store.search_messages(
    metadata_filter=Filters.logical_and([Filters.eq("session_id", "1"), Filters.text_match("content", "abc")]),
    limit=100,
)
messages, next_token = (response.hits, response.next_token)
print(len(messages))

7


## 3.5 批量删除 message

In [28]:
# 删除一个session的全部message
memory_store.delete_messages(session_id="1")

In [29]:
# 删除全部session的全部message（注意：高危）
memory_store.delete_all_messages()

# 4. 实践

按照用户和大模型交互的流程，将session和message进行实战应用。

## 4.1 新用户进来

In [None]:
# 创建一个session
session = Session(user_id="1", session_id="session_id_1")
session.update_time = microseconds_timestamp()
session.metadata["meta_使用哪个模型"] = "qwen.5"
memory_store.put_message(session)

# 用户提问：你好，帮我讲个笑话
message = Message(session_id="session_id_1", message_id="message_id_1", create_time=microseconds_timestamp())
message.content = "你好，帮我讲个笑话"
message.metadata["meta_访问来源"] = "web"
message.metadata["meta_消息分类"] = "用户"
memory_store.put_message(message)  # 记录用户消息
session.update_time = microseconds_timestamp()
memory_store.update_session(session)  # 记录用户消息时候，需要同时更新session信息，这里仅以更新update_time为例。

# 大模型返回：小白＋小白=? 小白兔(two)
message = Message(
    session_id="session_id_1", message_id="message_id_2", create_time=microseconds_timestamp()  # 消息id改变
)
message.content = "小白＋小白=? 小白兔(two)"
message.metadata["meta_消息分类"] = "大模型"
memory_store.put_message(message)  # 记录大模型消息

# 用户提问：再来一个
message = Message(
    session_id="session_id_1", message_id="message_id_3", create_time=microseconds_timestamp()  # 消息id改变
)
message.content = "再来一个"
message.metadata["meta_访问来源"] = "web"
message.metadata["meta_消息分类"] = "用户"
memory_store.put_message(message)
session.update_time = microseconds_timestamp()
memory_store.update_session(session)  # 记录用户消息时候，需要同时更新session信息，这里仅以更新update_time为例。
# 查出来上下文消息，也告诉大模型，这样大模型才知道“”
related_messages = list(memory_store.list_messages(session_id="session_id_1", max_count=3))

# 大模型返回：有一个躲猫猫社团，他们团长现在还没找到。
message = Message(
    session_id="session_id_1", message_id="message_id_4", create_time=microseconds_timestamp()  # 消息id改变
)
message.content = "小白＋小白=? 小白兔(two)"
message.metadata["meta_消息分类"] = "大模型"
memory_store.put_message(message)  # 记录大模型消息

## 4.2 老用户继续使用历史会话

In [None]:
# 展示出有哪些历史会话
sessions = list(memory_store.list_recent_sessions(user_id="1", max_count=5))

# 用户点击某一个会话session，这时候可以查询这个session详细信息(因为list最近会话session，有可能meta不全,如有必要刻意考虑再拿一次完整信息)
session = memory_store.get_session(user_id="1", session_id="session_id_1")

# 查出来当前session所有消息进行展示
all_messages = memory_store.list_messages(session_id="session_id_1")

# 后续继续与大模型提问交互即可