Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
11b63e6
debug an error function name
tangg555 Oct 20, 2025
72e8f39
feat: Add DynamicCache compatibility for different transformers versions
tangg555 Oct 20, 2025
5702870
feat: implement APIAnalyzerForScheduler for memory operations
tangg555 Oct 21, 2025
4655b41
feat: Add search_ws API endpoint and enhance API analyzer functionality
tangg555 Oct 21, 2025
c20736c
fix: resolve test failures and warnings in test suite
tangg555 Oct 21, 2025
da72e7e
feat: add a test_robustness execution to test thread pool execution
tangg555 Oct 21, 2025
5b9b1e4
feat: optimize scheduler configuration and API search functionality
tangg555 Oct 22, 2025
6dac11e
feat: Add Redis auto-initialization with fallback strategies
tangg555 Oct 22, 2025
a207bf4
feat: add database connection management to ORM module
tangg555 Oct 24, 2025
8c1cc04
remove part of test
tangg555 Oct 24, 2025
f2b0da4
feat: add Redis-based ORM with multiprocess synchronization
tangg555 Oct 24, 2025
f0e8aab
fix: resolve scheduler module import and Redis integration issues
tangg555 Oct 24, 2025
731f00d
revise naive memcube creation in server router
tangg555 Oct 25, 2025
6d442fb
remove long-time tests in test_scheduler
tangg555 Oct 25, 2025
157f858
remove redis test which needs .env
tangg555 Oct 25, 2025
c483011
refactor all codes about mixture search with scheduler
tangg555 Oct 25, 2025
b81b82e
fix: resolve Redis API synchronization issues and implement search AP…
tangg555 Oct 26, 2025
90d1a0b
remove a test for api module
tangg555 Oct 26, 2025
1de72cf
revise to pass the test suite
tangg555 Oct 26, 2025
c72858e
addressed all conflicts
tangg555 Oct 27, 2025
3245376
address some bugs to make mix_search normally running
tangg555 Oct 27, 2025
57482cf
modify codes according to evaluation logs
tangg555 Oct 27, 2025
e4b8313
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 27, 2025
011d248
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 28, 2025
8c8d672
feat: Optimize mixture search and enhance API client
tangg555 Oct 28, 2025
aabad8d
feat: Add conversation_turn tracking for session-based memory search
tangg555 Oct 28, 2025
3faa5c3
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 28, 2025
c6376cd
adress time bug in monitor
tangg555 Oct 29, 2025
bd0b234
revise simple tree
tangg555 Oct 29, 2025
5332d12
add mode to evaluation client; rewrite print to logger.info in db files
tangg555 Oct 29, 2025
aee13ba
feat: 1. add redis queue for scheduler 2. finish the code related to …
tangg555 Nov 5, 2025
f957967
debug the working memory code
tangg555 Nov 5, 2025
f520cca
addressed conflicts to merge
tangg555 Nov 5, 2025
a3f6636
addressed a range of bugs to make scheduler running correctly
tangg555 Nov 5, 2025
47e9851
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Nov 5, 2025
161af12
remove test_dispatch_parallel test
tangg555 Nov 5, 2025
1d8d14b
print change to logger.info
tangg555 Nov 5, 2025
00e3a75
addressed conflicts
tangg555 Nov 6, 2025
2852e56
adjucted the core code related to fine and mixture apis
tangg555 Nov 17, 2025
5d3cf45
addressed conflicts
tangg555 Nov 17, 2025
ab71f17
feat: create task queue to wrap local queue and redis queue. queue no…
tangg555 Nov 18, 2025
7665cda
fix bugs: debug bugs about internet trigger
tangg555 Nov 18, 2025
3559323
debug get searcher mode
tangg555 Nov 18, 2025
7c8e0d0
feat: add manual internet
fridayL Nov 18, 2025
27b0971
Merge branch 'feat/redis_scheduler' of https://github.com/MemTensor/M…
fridayL Nov 18, 2025
94d456b
Fix: fix code format
fridayL Nov 18, 2025
87b5358
feat: add strategy for fine search
tangg555 Nov 18, 2025
127fdc7
debug redis queue
tangg555 Nov 18, 2025
0911ced
debug redis queue
tangg555 Nov 18, 2025
d1a7261
fix bugs: completely addressed bugs about redis queue
tangg555 Nov 18, 2025
232be6f
refactor: add searcher to handler_init; remove info log from task_queue
tangg555 Nov 19, 2025
d16a7c8
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions examples/mem_scheduler/api_w_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ def my_test_handler(messages: list[ScheduleMessageItem]):
print(f"My test handler received {len(messages)} messages:")
for msg in messages:
print(f" my_test_handler - {msg.item_id}: {msg.content}")
print(
f"{queue._redis_conn.xinfo_groups(queue.stream_key_prefix)} qsize: {queue.qsize()} messages:{messages}"
user_status_running = handle_scheduler_status(
user_name=USER_MEM_CUBE, mem_scheduler=mem_scheduler, instance_id="api_w_scheduler"
)
print(f"[Monitor] Status for {USER_MEM_CUBE} after submit:", user_status_running)


# 2. Register the handler
Expand Down Expand Up @@ -59,10 +60,6 @@ def my_test_handler(messages: list[ScheduleMessageItem]):

# 5.1 Monitor status for specific mem_cube while running
USER_MEM_CUBE = "test_mem_cube"
user_status_running = handle_scheduler_status(
user_name=USER_MEM_CUBE, mem_scheduler=mem_scheduler, instance_id="api_w_scheduler"
)
print(f"[Monitor] Status for {USER_MEM_CUBE} after submit:", user_status_running)

# 6. Wait for messages to be processed (limited to 100 checks)
print("Waiting for messages to be consumed (max 100 checks)...")
Expand Down
8 changes: 8 additions & 0 deletions src/memos/api/handlers/base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from memos.log import get_logger
from memos.mem_scheduler.base_scheduler import BaseScheduler
from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher


logger = get_logger(__name__)
Expand All @@ -28,6 +29,7 @@ def __init__(
naive_mem_cube: Any | None = None,
mem_reader: Any | None = None,
mem_scheduler: Any | None = None,
searcher: Any | None = None,
embedder: Any | None = None,
reranker: Any | None = None,
graph_db: Any | None = None,
Expand Down Expand Up @@ -58,6 +60,7 @@ def __init__(
self.naive_mem_cube = naive_mem_cube
self.mem_reader = mem_reader
self.mem_scheduler = mem_scheduler
self.searcher = searcher
self.embedder = embedder
self.reranker = reranker
self.graph_db = graph_db
Expand Down Expand Up @@ -128,6 +131,11 @@ def mem_scheduler(self) -> BaseScheduler:
"""Get scheduler instance."""
return self.deps.mem_scheduler

@property
def searcher(self) -> Searcher:
"""Get scheduler instance."""
return self.deps.searcher

@property
def embedder(self):
"""Get embedder instance."""
Expand Down
22 changes: 17 additions & 5 deletions src/memos/api/handlers/component_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
including databases, LLMs, memory systems, and schedulers.
"""

import os

from typing import TYPE_CHECKING, Any

from memos.api.config import APIConfig
Expand Down Expand Up @@ -38,6 +40,10 @@
from memos.memories.textual.simple_preference import SimplePreferenceTextMemory
from memos.memories.textual.simple_tree import SimpleTreeTextMemory
from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager


if TYPE_CHECKING:
from memos.memories.textual.tree import TreeTextMemory
from memos.memories.textual.tree_text_memory.retrieve.internet_retriever_factory import (
InternetRetrieverFactory,
)
Expand All @@ -47,7 +53,7 @@

if TYPE_CHECKING:
from memos.mem_scheduler.optimized_scheduler import OptimizedScheduler

from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher
logger = get_logger(__name__)


Expand Down Expand Up @@ -205,6 +211,13 @@ def init_server() -> dict[str, Any]:

logger.debug("MemCube created")

tree_mem: TreeTextMemory = naive_mem_cube.text_mem
searcher: Searcher = tree_mem.get_searcher(
manual_close_internet=os.getenv("ENABLE_INTERNET", "true").lower() == "false",
moscube=False,
)
logger.debug("Searcher created")

# Initialize Scheduler
scheduler_config_dict = APIConfig.get_scheduler_config()
scheduler_config = SchedulerConfigFactory(
Expand All @@ -217,16 +230,14 @@ def init_server() -> dict[str, Any]:
db_engine=BaseDBManager.create_default_sqlite_engine(),
mem_reader=mem_reader,
)
mem_scheduler.init_mem_cube(mem_cube=naive_mem_cube)
mem_scheduler.init_mem_cube(mem_cube=naive_mem_cube, searcher=searcher)
logger.debug("Scheduler initialized")

# Initialize SchedulerAPIModule
api_module = mem_scheduler.api_module

# Start scheduler if enabled
import os

if os.getenv("API_SCHEDULER_ON", True):
if os.getenv("API_SCHEDULER_ON", "true").lower() == "true":
mem_scheduler.start()
logger.info("Scheduler started")

Expand All @@ -253,6 +264,7 @@ def init_server() -> dict[str, Any]:
"mos_server": mos_server,
"mem_scheduler": mem_scheduler,
"naive_mem_cube": naive_mem_cube,
"searcher": searcher,
"api_module": api_module,
"vector_db": vector_db,
"pref_extractor": pref_extractor,
Expand Down
39 changes: 24 additions & 15 deletions src/memos/api/handlers/search_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from memos.api.product_models import APISearchRequest, SearchResponse
from memos.context.context import ContextThreadPoolExecutor
from memos.log import get_logger
from memos.mem_scheduler.schemas.general_schemas import SearchMode
from memos.mem_scheduler.schemas.general_schemas import FINE_STRATEGY, FineStrategy, SearchMode
from memos.types import MOSSearchResult, UserContext


Expand All @@ -40,7 +40,7 @@ def __init__(self, dependencies: HandlerDependencies):
dependencies: HandlerDependencies instance
"""
super().__init__(dependencies)
self._validate_dependencies("naive_mem_cube", "mem_scheduler")
self._validate_dependencies("naive_mem_cube", "mem_scheduler", "searcher")

def handle_search_memories(self, search_req: APISearchRequest) -> SearchResponse:
"""
Expand Down Expand Up @@ -211,11 +211,17 @@ def _fast_search(

return formatted_memories

def _deep_search(
self, search_req: APISearchRequest, user_context: UserContext, max_thinking_depth: int
) -> list:
logger.error("waiting to be implemented")
return []

def _fine_search(
self,
search_req: APISearchRequest,
user_context: UserContext,
) -> list:
) -> list[str]:
"""
Fine-grained search with query enhancement.

Expand All @@ -226,19 +232,22 @@ def _fine_search(
Returns:
List of enhanced search results
"""
if FINE_STRATEGY == FineStrategy.DEEP_SEARCH:
return self._deep_search(
search_req=search_req, user_context=user_context, max_thinking_depth=3
)

target_session_id = search_req.session_id or "default_session"
search_filter = {"session_id": search_req.session_id} if search_req.session_id else None

searcher = self.mem_scheduler.searcher

info = {
"user_id": search_req.user_id,
"session_id": target_session_id,
"chat_history": search_req.chat_history,
}

# Fine retrieve
fast_retrieved_memories = searcher.retrieve(
raw_retrieved_memories = self.searcher.retrieve(
query=search_req.query,
user_name=user_context.mem_cube_id,
top_k=search_req.top_k,
Expand All @@ -250,8 +259,8 @@ def _fine_search(
)

# Post retrieve
fast_memories = searcher.post_retrieve(
retrieved_results=fast_retrieved_memories,
raw_memories = self.searcher.post_retrieve(
retrieved_results=raw_retrieved_memories,
top_k=search_req.top_k,
user_name=user_context.mem_cube_id,
info=info,
Expand All @@ -260,22 +269,22 @@ def _fine_search(
# Enhance with query
enhanced_memories, _ = self.mem_scheduler.retriever.enhance_memories_with_query(
query_history=[search_req.query],
memories=fast_memories,
memories=raw_memories,
)

if len(enhanced_memories) < len(fast_memories):
if len(enhanced_memories) < len(raw_memories):
logger.info(
f"Enhanced memories ({len(enhanced_memories)}) are less than fast memories ({len(fast_memories)}). Recalling for more."
f"Enhanced memories ({len(enhanced_memories)}) are less than raw memories ({len(raw_memories)}). Recalling for more."
)
missing_info_hint, trigger = self.mem_scheduler.retriever.recall_for_missing_memories(
query=search_req.query,
memories=fast_memories,
memories=raw_memories,
)
retrieval_size = len(fast_memories) - len(enhanced_memories)
retrieval_size = len(raw_memories) - len(enhanced_memories)
logger.info(f"Retrieval size: {retrieval_size}")
if trigger:
logger.info(f"Triggering additional search with hint: {missing_info_hint}")
additional_memories = searcher.search(
additional_memories = self.searcher.search(
query=missing_info_hint,
user_name=user_context.mem_cube_id,
top_k=retrieval_size,
Expand All @@ -286,7 +295,7 @@ def _fine_search(
)
else:
logger.info("Not triggering additional search, using fast memories.")
additional_memories = fast_memories[:retrieval_size]
additional_memories = raw_memories[:retrieval_size]

enhanced_memories += additional_memories
logger.info(
Expand Down
19 changes: 13 additions & 6 deletions src/memos/mem_scheduler/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@
from memos.memories.activation.kv import KVCacheMemory
from memos.memories.activation.vllmkv import VLLMKVCacheItem, VLLMKVCacheMemory
from memos.memories.textual.tree import TextualMemoryItem, TreeTextMemory
from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher
from memos.templates.mem_scheduler_prompts import MEMORY_ASSEMBLY_TEMPLATE


if TYPE_CHECKING:
from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher
from memos.reranker.http_bge import HTTPBGEReranker


Expand Down Expand Up @@ -141,14 +141,21 @@ def __init__(self, config: BaseSchedulerConfig):
self.auth_config = None
self.rabbitmq_config = None

def init_mem_cube(self, mem_cube):
def init_mem_cube(
self,
mem_cube: BaseMemCube,
searcher: Searcher | None = None,
):
self.mem_cube = mem_cube
self.text_mem: TreeTextMemory = self.mem_cube.text_mem
self.searcher: Searcher = self.text_mem.get_searcher(
manual_close_internet=os.getenv("ENABLE_INTERNET", "true").lower() == "false",
moscube=False,
)
self.reranker: HTTPBGEReranker = self.text_mem.reranker
if searcher is None:
self.searcher: Searcher = self.text_mem.get_searcher(
manual_close_internet=os.getenv("ENABLE_INTERNET", "true").lower() == "false",
moscube=False,
)
else:
self.searcher = searcher

def initialize_modules(
self,
Expand Down
1 change: 1 addition & 0 deletions src/memos/mem_scheduler/schemas/general_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class FineStrategy(str, Enum):

REWRITE = "rewrite"
RECREATE = "recreate"
DEEP_SEARCH = "deep_search"


FILE_PATH = Path(__file__).absolute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
self.metrics.on_done(label=m.label, mem_cube_id=m.mem_cube_id, now=time.time())

# acknowledge redis messages

if self.use_redis_queue and self.memos_message_queue is not None:
for msg in messages:
redis_message_id = msg.redis_message_id
Expand Down
5 changes: 1 addition & 4 deletions src/memos/mem_scheduler/task_schedule_modules/local_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ def put(

def get(
self,
user_id: str,
mem_cube_id: str,
stream_key: str,
block: bool = True,
timeout: float | None = None,
batch_size: int | None = None,
Expand All @@ -91,8 +90,6 @@ def get(
)
return []

stream_key = self.get_stream_key(user_id=user_id, mem_cube_id=mem_cube_id)

# Return empty list if queue does not exist
if stream_key not in self.queue_streams:
logger.error(f"Stream {stream_key} does not exist when trying to get messages.")
Expand Down
32 changes: 17 additions & 15 deletions src/memos/mem_scheduler/task_schedule_modules/redis_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
the local memos_message_queue functionality in BaseScheduler.
"""

import re
import time

from collections.abc import Callable
Expand Down Expand Up @@ -165,8 +166,7 @@ def ack_message(self, user_id, mem_cube_id, redis_message_id) -> None:

def get(
self,
user_id: str,
mem_cube_id: str,
stream_key: str,
block: bool = True,
timeout: float | None = None,
batch_size: int | None = None,
Expand All @@ -175,8 +175,6 @@ def get(
raise ConnectionError("Not connected to Redis. Redis connection not available.")

try:
stream_key = self.get_stream_key(user_id=user_id, mem_cube_id=mem_cube_id)

# Calculate timeout for Redis
redis_timeout = None
if block and timeout is not None:
Expand Down Expand Up @@ -295,17 +293,21 @@ def get_stream_keys(self) -> list[str]:
if not self._redis_conn:
return []

try:
# Use match parameter and decode byte strings to regular strings
stream_keys = [
key.decode("utf-8") if isinstance(key, bytes) else key
for key in self._redis_conn.scan_iter(match=f"{self.stream_key_prefix}:*")
]
logger.debug(f"get stream_keys from redis: {stream_keys}")
return stream_keys
except Exception as e:
logger.error(f"Failed to list Redis stream keys: {e}")
return []
# First, get all keys that might match (using Redis pattern matching)
redis_pattern = f"{self.stream_key_prefix}:*"
raw_keys = [
key.decode("utf-8") if isinstance(key, bytes) else key
for key in self._redis_conn.scan_iter(match=redis_pattern)
]

# Second, filter using Python regex to ensure exact prefix match
# Escape special regex characters in the prefix, then add :.*
escaped_prefix = re.escape(self.stream_key_prefix)
regex_pattern = f"^{escaped_prefix}:"
stream_keys = [key for key in raw_keys if re.match(regex_pattern, key)]

logger.debug(f"get stream_keys from redis: {stream_keys}")
return stream_keys

def size(self) -> int:
"""
Expand Down
Loading
Loading