Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
11b63e6
debug an error function name
tangg555 Oct 20, 2025
72e8f39
feat: Add DynamicCache compatibility for different transformers versions
tangg555 Oct 20, 2025
5702870
feat: implement APIAnalyzerForScheduler for memory operations
tangg555 Oct 21, 2025
4655b41
feat: Add search_ws API endpoint and enhance API analyzer functionality
tangg555 Oct 21, 2025
c20736c
fix: resolve test failures and warnings in test suite
tangg555 Oct 21, 2025
da72e7e
feat: add a test_robustness execution to test thread pool execution
tangg555 Oct 21, 2025
5b9b1e4
feat: optimize scheduler configuration and API search functionality
tangg555 Oct 22, 2025
6dac11e
feat: Add Redis auto-initialization with fallback strategies
tangg555 Oct 22, 2025
a207bf4
feat: add database connection management to ORM module
tangg555 Oct 24, 2025
8c1cc04
remove part of test
tangg555 Oct 24, 2025
f2b0da4
feat: add Redis-based ORM with multiprocess synchronization
tangg555 Oct 24, 2025
f0e8aab
fix: resolve scheduler module import and Redis integration issues
tangg555 Oct 24, 2025
731f00d
revise naive memcube creation in server router
tangg555 Oct 25, 2025
6d442fb
remove long-time tests in test_scheduler
tangg555 Oct 25, 2025
157f858
remove redis test which needs .env
tangg555 Oct 25, 2025
c483011
refactor all codes about mixture search with scheduler
tangg555 Oct 25, 2025
b81b82e
fix: resolve Redis API synchronization issues and implement search AP…
tangg555 Oct 26, 2025
90d1a0b
remove a test for api module
tangg555 Oct 26, 2025
1de72cf
revise to pass the test suite
tangg555 Oct 26, 2025
c72858e
addressed all conflicts
tangg555 Oct 27, 2025
3245376
address some bugs to make mix_search normally running
tangg555 Oct 27, 2025
57482cf
modify codes according to evaluation logs
tangg555 Oct 27, 2025
e4b8313
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 27, 2025
011d248
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 28, 2025
8c8d672
feat: Optimize mixture search and enhance API client
tangg555 Oct 28, 2025
aabad8d
feat: Add conversation_turn tracking for session-based memory search
tangg555 Oct 28, 2025
3faa5c3
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 28, 2025
c6376cd
adress time bug in monitor
tangg555 Oct 29, 2025
bd0b234
revise simple tree
tangg555 Oct 29, 2025
5332d12
add mode to evaluation client; rewrite print to logger.info in db files
tangg555 Oct 29, 2025
e7e7bd8
Merge branch 'dev' into dev
CaralHsi Oct 29, 2025
4968009
Merge branch 'dev' into dev
fridayL Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions evaluation/scripts/utils/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def search(self, query, user_id, top_k):
"mem_cube_id": user_id,
"conversation_id": "",
"top_k": top_k,
"mode": "fast",
"mode": os.getenv("SEARCH_MODE", "fast"),
"handle_pref_mem": False,
},
ensure_ascii=False,
Expand Down Expand Up @@ -232,7 +232,7 @@ def search(self, query, user_id, top_k):
"query": query,
"user_id": user_id,
"memory_limit_number": top_k,
"mode": "mixture",
"mode": os.getenv("SEARCH_MODE", "fast"),
}
)

Expand Down
2 changes: 1 addition & 1 deletion src/memos/graph_dbs/neo4j.py
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ def drop_database(self) -> None:

with self.driver.session(database=self.system_db_name) as session:
session.run(f"DROP DATABASE {self.db_name} IF EXISTS")
print(f"Database '{self.db_name}' has been dropped.")
logger.info(f"Database '{self.db_name}' has been dropped.")
else:
raise ValueError(
f"Refusing to drop protected database: {self.db_name} in "
Expand Down
91 changes: 27 additions & 64 deletions src/memos/graph_dbs/polardb.py

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion src/memos/mem_scheduler/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from collections.abc import Callable
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING

from sqlalchemy.engine import Engine

Expand Down Expand Up @@ -50,6 +51,10 @@
from memos.templates.mem_scheduler_prompts import MEMORY_ASSEMBLY_TEMPLATE


if TYPE_CHECKING:
from memos.mem_cube.base import BaseMemCube


logger = get_logger(__name__)


Expand Down Expand Up @@ -124,7 +129,7 @@ def __init__(self, config: BaseSchedulerConfig):
self._context_lock = threading.Lock()
self.current_user_id: UserID | str | None = None
self.current_mem_cube_id: MemCubeID | str | None = None
self.current_mem_cube: GeneralMemCube | None = None
self.current_mem_cube: BaseMemCube | None = None
self.auth_config_path: str | Path | None = self.config.get("auth_config_path", None)
self.auth_config = None
self.rabbitmq_config = None
Expand Down
60 changes: 26 additions & 34 deletions src/memos/mem_scheduler/general_modules/api_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,27 @@
APISearchHistoryManager,
TaskRunningStatus,
)
from memos.mem_scheduler.utils.db_utils import get_utc_now
from memos.memories.textual.item import TextualMemoryItem


logger = get_logger(__name__)


class SchedulerAPIModule(BaseSchedulerModule):
def __init__(self, window_size=5):
def __init__(self, window_size: int | None = None, history_memory_turns: int | None = None):
super().__init__()
self.window_size = window_size
self.history_memory_turns = history_memory_turns
self.search_history_managers: dict[str, APIRedisDBManager] = {}
self.pre_memory_turns = 5

def get_search_history_manager(self, user_id: str, mem_cube_id: str) -> APIRedisDBManager:
"""Get or create a Redis manager for search history."""
logger.info(
f"Getting search history manager for user_id: {user_id}, mem_cube_id: {mem_cube_id}"
)
key = f"search_history:{user_id}:{mem_cube_id}"
if key not in self.search_history_managers:
logger.info(f"Creating new search history manager for key: {key}")
self.search_history_managers[key] = APIRedisDBManager(
user_id=user_id,
mem_cube_id=mem_cube_id,
Expand All @@ -41,8 +44,12 @@ def sync_search_data(
query: str,
memories: list[TextualMemoryItem],
formatted_memories: Any,
conversation_id: str | None = None,
session_id: str | None = None,
conversation_turn: int = 0,
) -> Any:
logger.info(
f"Syncing search data for item_id: {item_id}, user_id: {user_id}, mem_cube_id: {mem_cube_id}"
)
# Get the search history manager
manager = self.get_search_history_manager(user_id, mem_cube_id)
manager.sync_with_redis(size_limit=self.window_size)
Expand All @@ -59,7 +66,7 @@ def sync_search_data(
query=query,
formatted_memories=formatted_memories,
task_status=TaskRunningStatus.COMPLETED, # Use the provided running_status
conversation_id=conversation_id,
session_id=session_id,
memories=memories,
)

Expand All @@ -69,18 +76,18 @@ def sync_search_data(
logger.warning(f"Failed to update entry with item_id: {item_id}")
else:
# Add new entry based on running_status
search_entry = APIMemoryHistoryEntryItem(
entry_item = APIMemoryHistoryEntryItem(
item_id=item_id,
query=query,
formatted_memories=formatted_memories,
memories=memories,
task_status=TaskRunningStatus.COMPLETED,
conversation_id=conversation_id,
created_time=get_utc_now(),
session_id=session_id,
conversation_turn=conversation_turn,
)

# Add directly to completed list as APIMemoryHistoryEntryItem instance
search_history.completed_entries.append(search_entry)
search_history.completed_entries.append(entry_item)

# Maintain window size
if len(search_history.completed_entries) > search_history.window_size:
Expand All @@ -101,37 +108,22 @@ def sync_search_data(
manager.sync_with_redis(size_limit=self.window_size)
return manager

def get_pre_memories(self, user_id: str, mem_cube_id: str) -> list:
"""
Get pre-computed memories from the most recent completed search entry.

Args:
user_id: User identifier
mem_cube_id: Memory cube identifier

Returns:
List of TextualMemoryItem objects from the most recent completed search
"""
manager = self.get_search_history_manager(user_id, mem_cube_id)

existing_data = manager.load_from_db()
if existing_data is None:
return []

search_history: APISearchHistoryManager = existing_data

# Get memories from the most recent completed entry
history_memories = search_history.get_history_memories(turns=self.pre_memory_turns)
return history_memories

def get_history_memories(self, user_id: str, mem_cube_id: str, n: int) -> list:
def get_history_memories(
self, user_id: str, mem_cube_id: str, turns: int | None = None
) -> list:
"""Get history memories for backward compatibility with tests."""
logger.info(
f"Getting history memories for user_id: {user_id}, mem_cube_id: {mem_cube_id}, turns: {turns}"
)
manager = self.get_search_history_manager(user_id, mem_cube_id)
existing_data = manager.load_from_db()

if existing_data is None:
return []

if turns is None:
turns = self.history_memory_turns

# Handle different data formats
if isinstance(existing_data, APISearchHistoryManager):
search_history = existing_data
Expand All @@ -142,4 +134,4 @@ def get_history_memories(self, user_id: str, mem_cube_id: str, n: int) -> list:
except Exception:
return []

return search_history.get_history_memories(turns=n)
return search_history.get_history_memories(turns=turns)
4 changes: 2 additions & 2 deletions src/memos/mem_scheduler/monitors/general_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ def __init__(
] = {}

# Lifecycle monitor
self.last_activation_mem_update_time = datetime.min
self.last_query_consume_time = datetime.min
self.last_activation_mem_update_time = get_utc_now()
self.last_query_consume_time = get_utc_now()

self._register_lock = Lock()
self._process_llm = process_llm
Expand Down
Loading