diff --git a/examples/mem_scheduler/memos_w_scheduler.py b/examples/mem_scheduler/memos_w_scheduler.py index c523a866..17bfd399 100644 --- a/examples/mem_scheduler/memos_w_scheduler.py +++ b/examples/mem_scheduler/memos_w_scheduler.py @@ -3,23 +3,29 @@ from pathlib import Path from queue import Queue -from typing import TYPE_CHECKING - from memos.configs.mem_cube import GeneralMemCubeConfig from memos.configs.mem_os import MOSConfig +from datetime import datetime +import re + from memos.configs.mem_scheduler import AuthConfig from memos.log import get_logger from memos.mem_cube.general import GeneralMemCube from memos.mem_os.main import MOS +from memos.mem_scheduler.schemas.general_schemas import ( + QUERY_LABEL, + ANSWER_LABEL, + ADD_LABEL, + MEM_ORGANIZE_LABEL, + MEM_UPDATE_LABEL, + MEM_ARCHIVE_LABEL, + NOT_APPLICABLE_TYPE, +) +from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem +from memos.mem_scheduler.utils.filter_utils import transform_name_to_key from memos.mem_scheduler.general_scheduler import GeneralScheduler -if TYPE_CHECKING: - from memos.mem_scheduler.schemas.message_schemas import ( - ScheduleLogForWebItem, - ) - - FILE_PATH = Path(__file__).absolute() BASE_DIR = FILE_PATH.parent.parent.parent sys.path.insert(0, str(BASE_DIR)) # Enable execution from any working directory @@ -70,6 +76,91 @@ def init_task(): return conversations, questions +def _truncate_with_rules(text: str) -> str: + has_cjk = bool(re.search(r"[\u4e00-\u9fff]", text)) + limit = 32 if has_cjk else 64 + normalized = text.strip().replace("\n", " ") + if len(normalized) <= limit: + return normalized + return normalized[:limit] + "..." + + +def _format_title(ts: datetime, title_text: str) -> str: + return f"{ts.astimezone().strftime('%H:%M:%S')} {title_text}" + + +def _cube_display_from(mem_cube_id: str) -> str: + if "public" in (mem_cube_id or "").lower(): + return "PublicMemCube" + return "UserMemCube" + + +_TYPE_SHORT = { + "LongTermMemory": "LTM", + "UserMemory": "User", + "WorkingMemory": "Working", + "ActivationMemory": "Activation", + "ParameterMemory": "Parameter", + "TextMemory": "Text", + "UserInput": "Input", + "NotApplicable": "NA", +} + + +def _format_entry(item: ScheduleLogForWebItem) -> tuple[str, str]: + cube_display = getattr(item, "memcube_name", None) or _cube_display_from(item.mem_cube_id) + label = item.label + content = item.log_content or "" + memcube_content = getattr(item, "memcube_log_content", None) or [] + memory_len = getattr(item, "memory_len", None) or len(memcube_content) or 1 + + def _first_content() -> str: + if memcube_content: + return memcube_content[0].get("content", "") or content + return content + + if label in ("addMessage", QUERY_LABEL, ANSWER_LABEL): + target_cube = cube_display.replace("MemCube", "") + title = _format_title(item.timestamp, f"addMessages to {target_cube} MemCube") + return title, _truncate_with_rules(_first_content()) + + if label in ("addMemory", ADD_LABEL): + title = _format_title(item.timestamp, f"{cube_display} added {memory_len} memories") + return title, _truncate_with_rules(_first_content()) + + if label in ("updateMemory", MEM_UPDATE_LABEL): + title = _format_title(item.timestamp, f"{cube_display} updated {memory_len} memories") + return title, _truncate_with_rules(_first_content()) + + if label in ("archiveMemory", MEM_ARCHIVE_LABEL): + title = _format_title(item.timestamp, f"{cube_display} archived {memory_len} memories") + return title, _truncate_with_rules(_first_content()) + + if label in ("mergeMemory", MEM_ORGANIZE_LABEL): + title = _format_title(item.timestamp, f"{cube_display} merged {memory_len} memories") + merged = [c for c in memcube_content if c.get("type") == "merged"] + post = [c for c in memcube_content if c.get("type") == "postMerge"] + parts = [] + if merged: + parts.append("Merged: " + " | ".join(c.get("content", "") for c in merged)) + if post: + parts.append("Result: " + " | ".join(c.get("content", "") for c in post)) + detail = " ".join(parts) if parts else _first_content() + return title, _truncate_with_rules(detail) + + if label == "scheduleMemory": + title = _format_title(item.timestamp, f"{cube_display} scheduled {memory_len} memories") + if memcube_content: + return title, _truncate_with_rules(memcube_content[0].get("content", "")) + key = transform_name_to_key(content) + from_short = _TYPE_SHORT.get(item.from_memory_type, item.from_memory_type) + to_short = _TYPE_SHORT.get(item.to_memory_type, item.to_memory_type) + return title, _truncate_with_rules(f"[{from_short}→{to_short}] {key}: {content}") + + title = _format_title(item.timestamp, f"{cube_display} event") + return title, _truncate_with_rules(_first_content()) + + def show_web_logs(mem_scheduler: GeneralScheduler): """Display all web log entries from the scheduler's log queue. @@ -84,24 +175,25 @@ def show_web_logs(mem_scheduler: GeneralScheduler): # Create a temporary queue to preserve the original queue contents temp_queue = Queue() - log_count = 0 + collected: list[ScheduleLogForWebItem] = [] while not mem_scheduler._web_log_message_queue.empty(): log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get() + collected.append(log_item) temp_queue.put(log_item) - log_count += 1 - - # Print log entry details - print(f"\nLog Entry #{log_count}:") - print(f'- "{log_item.label}" log: {log_item}') + for idx, log_item in enumerate(sorted(collected, key=lambda x: x.timestamp, reverse=True), 1): + title, content = _format_entry(log_item) + print(f"\nLog Entry #{idx}:") + print(title) + print(content) print("-" * 50) # Restore items back to the original queue while not temp_queue.empty(): mem_scheduler._web_log_message_queue.put(temp_queue.get()) - print(f"\nTotal {log_count} web log entries displayed.") + print(f"\nTotal {len(collected)} web log entries displayed.") print("=" * 110 + "\n") diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index b02569e6..b3b51730 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -98,6 +98,7 @@ def add_memories(add_req: APIADDRequest): # Scheduler API Endpoints # ============================================================================= + @router.get("/scheduler/status", summary="Get scheduler running status") def scheduler_status(user_name: str | None = None): """Get scheduler running status.""" diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 6ad7f5cd..63b87157 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -566,20 +566,71 @@ def _submit_web_logs( def get_web_log_messages(self) -> list[dict]: """ - Retrieves all web log messages from the queue and returns them as a list of JSON-serializable dictionaries. - - Returns: - List[dict]: A list of dictionaries representing ScheduleLogForWebItem objects, - ready for JSON serialization. The list is ordered from oldest to newest. + Retrieve structured log messages from the queue and return JSON-serializable dicts. """ - messages = [] + raw_items: list[ScheduleLogForWebItem] = [] while True: try: - item = self._web_log_message_queue.get_nowait() # Thread-safe get - messages.append(item.to_dict()) + raw_items.append(self._web_log_message_queue.get_nowait()) except Exception: break - return messages + + def _map_label(label: str) -> str: + from memos.mem_scheduler.schemas.general_schemas import ( + QUERY_LABEL, + ANSWER_LABEL, + ADD_LABEL, + MEM_UPDATE_LABEL, + MEM_ORGANIZE_LABEL, + MEM_ARCHIVE_LABEL, + ) + + mapping = { + QUERY_LABEL: "addMessage", + ANSWER_LABEL: "addMessage", + ADD_LABEL: "addMemory", + MEM_UPDATE_LABEL: "updateMemory", + MEM_ORGANIZE_LABEL: "mergeMemory", + MEM_ARCHIVE_LABEL: "archiveMemory", + } + return mapping.get(label, label) + + def _normalize_item(item: ScheduleLogForWebItem) -> dict: + data = item.to_dict() + data["label"] = _map_label(data.get("label")) + memcube_content = getattr(item, "memcube_log_content", None) or [] + metadata = getattr(item, "metadata", None) or [] + + memcube_name = getattr(item, "memcube_name", None) + if not memcube_name and hasattr(self, "_map_memcube_name"): + memcube_name = self._map_memcube_name(item.mem_cube_id) + data["memcube_name"] = memcube_name + + memory_len = getattr(item, "memory_len", None) + if memory_len is None: + if data["label"] == "mergeMemory": + memory_len = len([c for c in memcube_content if c.get("type") != "postMerge"]) + elif memcube_content: + memory_len = len(memcube_content) + else: + memory_len = 1 if item.log_content else 0 + + data["memcube_log_content"] = memcube_content + data["memory_len"] = memory_len + + def _with_memory_time(meta: dict) -> dict: + enriched = dict(meta) + if "memory_time" not in enriched: + enriched["memory_time"] = enriched.get("updated_at") or enriched.get( + "update_at" + ) + return enriched + + data["metadata"] = [_with_memory_time(m) for m in metadata] + data["log_title"] = "" + return data + + return [_normalize_item(it) for it in raw_items] def _message_consumer(self) -> None: """ diff --git a/src/memos/mem_scheduler/general_modules/scheduler_logger.py b/src/memos/mem_scheduler/general_modules/scheduler_logger.py index d35a4f10..3859c9e6 100644 --- a/src/memos/mem_scheduler/general_modules/scheduler_logger.py +++ b/src/memos/mem_scheduler/general_modules/scheduler_logger.py @@ -1,18 +1,18 @@ from collections.abc import Callable from memos.log import get_logger -from memos.mem_cube.base import BaseMemCube +from memos.mem_cube.general import GeneralMemCube from memos.mem_scheduler.general_modules.base import BaseSchedulerModule from memos.mem_scheduler.schemas.general_schemas import ( ACTIVATION_MEMORY_TYPE, ADD_LABEL, - LONG_TERM_MEMORY_TYPE, NOT_INITIALIZED, PARAMETER_MEMORY_TYPE, - QUERY_LABEL, TEXT_MEMORY_TYPE, USER_INPUT_TYPE, WORKING_MEMORY_TYPE, + MEM_UPDATE_LABEL, + MEM_ARCHIVE_LABEL, ) from memos.mem_scheduler.schemas.message_schemas import ( ScheduleLogForWebItem, @@ -23,6 +23,7 @@ ) from memos.mem_scheduler.utils.misc_utils import log_exceptions from memos.memories.textual.tree import TextualMemoryItem, TreeTextMemory +import hashlib logger = get_logger(__name__) @@ -44,7 +45,7 @@ def create_autofilled_log_item( to_memory_type: str, user_id: str, mem_cube_id: str, - mem_cube: BaseMemCube, + mem_cube: GeneralMemCube, ) -> ScheduleLogForWebItem: text_mem_base: TreeTextMemory = mem_cube.text_mem current_memory_sizes = text_mem_base.get_current_memory_size() @@ -98,6 +99,41 @@ def create_autofilled_log_item( ) return log_message + @log_exceptions(logger=logger) + def create_event_log( + self, + label: str, + from_memory_type: str, + to_memory_type: str, + user_id: str, + mem_cube_id: str, + mem_cube: GeneralMemCube, + memcube_log_content: list[dict], + metadata: list[dict], + memory_len: int, + memcube_name: str | None = None, + ) -> ScheduleLogForWebItem: + item = self.create_autofilled_log_item( + log_content="", + label=label, + from_memory_type=from_memory_type, + to_memory_type=to_memory_type, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + ) + item.memcube_log_content = memcube_log_content + item.metadata = metadata + item.memory_len = memory_len + item.memcube_name = memcube_name or self._map_memcube_name(mem_cube_id) + return item + + def _map_memcube_name(self, mem_cube_id: str) -> str: + x = mem_cube_id or "" + if "public" in x.lower(): + return "PublicMemCube" + return "UserMemCube" + # TODO: Log output count is incorrect @log_exceptions(logger=logger) def log_working_memory_replacement( @@ -106,54 +142,57 @@ def log_working_memory_replacement( new_memory: list[TextualMemoryItem], user_id: str, mem_cube_id: str, - mem_cube: BaseMemCube, + mem_cube: GeneralMemCube, log_func_callback: Callable[[list[ScheduleLogForWebItem]], None], ): """Log changes when working memory is replaced.""" - memory_type_map = { - transform_name_to_key(name=m.memory): m.metadata.memory_type - for m in original_memory + new_memory - } - original_text_memories = [m.memory for m in original_memory] new_text_memories = [m.memory for m in new_memory] - - # Convert to sets for efficient difference operations original_set = set(original_text_memories) new_set = set(new_text_memories) - - # Identify changes - added_memories = list(new_set - original_set) # Present in new but not original - - # recording messages - log_messages = [] - for memory in added_memories: - normalized_mem = transform_name_to_key(name=memory) - if normalized_mem not in memory_type_map: - logger.error(f"Memory text not found in type mapping: {memory[:50]}...") - # Get the memory type from the map, default to LONG_TERM_MEMORY_TYPE if not found - mem_type = memory_type_map.get(normalized_mem, LONG_TERM_MEMORY_TYPE) - - if mem_type == WORKING_MEMORY_TYPE: - logger.warning(f"Memory already in working memory: {memory[:50]}...") + added_texts = list(new_set - original_set) + memcube_content = [] + meta = [] + by_text = {m.memory: m for m in new_memory} + for t in added_texts: + itm = by_text.get(t) + if not itm: continue - - log_message = self.create_autofilled_log_item( - log_content=memory, - label=QUERY_LABEL, - from_memory_type=mem_type, - to_memory_type=WORKING_MEMORY_TYPE, - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=mem_cube, + key_name = getattr(itm.metadata, "key", None) or itm.memory + k = transform_name_to_key(name=key_name) + memcube_content.append( + { + "content": f"[{itm.metadata.memory_type}→{WORKING_MEMORY_TYPE}] {k}: {itm.memory}", + "ref_id": itm.id, + } ) - log_messages.append(log_message) - - logger.info( - f"{len(added_memories)} {LONG_TERM_MEMORY_TYPE} memorie(s) " - f"transformed to {WORKING_MEMORY_TYPE} memories." + meta.append( + { + "ref_id": itm.id, + "id": itm.id, + "key": itm.metadata.key, + "memory": itm.memory, + "memory_type": itm.metadata.memory_type, + "status": itm.metadata.status, + "confidence": itm.metadata.confidence, + "tags": itm.metadata.tags, + "updated_at": getattr(itm.metadata, "updated_at", None) + or getattr(itm.metadata, "update_at", None), + } + ) + ev = self.create_event_log( + label="scheduleMemory", + from_memory_type=TEXT_MEMORY_TYPE, + to_memory_type=WORKING_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + memcube_log_content=memcube_content, + metadata=meta, + memory_len=len(memcube_content), + memcube_name=self._map_memcube_name(mem_cube_id), ) - log_func_callback(log_messages) + log_func_callback([ev]) @log_exceptions(logger=logger) def log_activation_memory_update( @@ -163,49 +202,51 @@ def log_activation_memory_update( label: str, user_id: str, mem_cube_id: str, - mem_cube: BaseMemCube, + mem_cube: GeneralMemCube, log_func_callback: Callable[[list[ScheduleLogForWebItem]], None], ): """Log changes when activation memory is updated.""" original_set = set(original_text_memories) new_set = set(new_text_memories) - # Identify changes - added_memories = list(new_set - original_set) # Present in new but not original - - # recording messages - log_messages = [] + added_memories = list(new_set - original_set) + memcube_content = [] + meta = [] for mem in added_memories: - log_message_a = self.create_autofilled_log_item( - log_content=mem, - label=label, - from_memory_type=TEXT_MEMORY_TYPE, - to_memory_type=ACTIVATION_MEMORY_TYPE, - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=mem_cube, + key = transform_name_to_key(mem) + ref_id = f"actparam-{hashlib.md5(mem.encode()).hexdigest()}" + memcube_content.append( + { + "content": f"[{ACTIVATION_MEMORY_TYPE}→{PARAMETER_MEMORY_TYPE}] {key}: {mem}", + "ref_id": ref_id, + } ) - logger.info( - f"{len(added_memories)} {TEXT_MEMORY_TYPE} memorie(s) " - f"transformed to {ACTIVATION_MEMORY_TYPE} memories." - ) - - log_message_b = self.create_autofilled_log_item( - log_content=mem, - label=label, - from_memory_type=ACTIVATION_MEMORY_TYPE, - to_memory_type=PARAMETER_MEMORY_TYPE, - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=mem_cube, + meta.append( + { + "ref_id": ref_id, + "id": ref_id, + "key": key, + "memory": mem, + "memory_type": ACTIVATION_MEMORY_TYPE, + "status": None, + "confidence": None, + "tags": None, + "updated_at": None, + } ) - - log_messages.extend([log_message_a, log_message_b]) - logger.info( - f"{len(added_memories)} {ACTIVATION_MEMORY_TYPE} memorie(s) " - f"transformed to {PARAMETER_MEMORY_TYPE} memories." + ev = self.create_event_log( + label="scheduleMemory", + from_memory_type=ACTIVATION_MEMORY_TYPE, + to_memory_type=PARAMETER_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + memcube_log_content=memcube_content, + metadata=meta, + memory_len=len(added_memories), + memcube_name=self._map_memcube_name(mem_cube_id), ) - log_func_callback(log_messages) + log_func_callback([ev]) @log_exceptions(logger=logger) def log_adding_memory( @@ -214,10 +255,10 @@ def log_adding_memory( memory_type: str, user_id: str, mem_cube_id: str, - mem_cube: BaseMemCube, + mem_cube: GeneralMemCube, log_func_callback: Callable[[list[ScheduleLogForWebItem]], None], ): - """Log changes when working memory is replaced.""" + """Deprecated: legacy text log. Use create_event_log with structured fields instead.""" log_message = self.create_autofilled_log_item( log_content=memory, label=ADD_LABEL, @@ -233,6 +274,50 @@ def log_adding_memory( f"converted to {memory_type} memory in mem_cube {mem_cube_id}: {memory}" ) + @log_exceptions(logger=logger) + def log_updating_memory( + self, + memory: str, + memory_type: str, + user_id: str, + mem_cube_id: str, + mem_cube: GeneralMemCube, + log_func_callback: Callable[[list[ScheduleLogForWebItem]], None], + ): + """Deprecated: legacy text log. Use create_event_log with structured fields instead.""" + log_message = self.create_autofilled_log_item( + log_content=memory, + label=MEM_UPDATE_LABEL, + from_memory_type=memory_type, + to_memory_type=memory_type, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + ) + log_func_callback([log_message]) + + @log_exceptions(logger=logger) + def log_archiving_memory( + self, + memory: str, + memory_type: str, + user_id: str, + mem_cube_id: str, + mem_cube: GeneralMemCube, + log_func_callback: Callable[[list[ScheduleLogForWebItem]], None], + ): + """Deprecated: legacy text log. Use create_event_log with structured fields instead.""" + log_message = self.create_autofilled_log_item( + log_content=memory, + label=MEM_ARCHIVE_LABEL, + from_memory_type=memory_type, + to_memory_type=memory_type, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + ) + log_func_callback([log_message]) + @log_exceptions(logger=logger) def validate_schedule_message(self, message: ScheduleMessageItem, label: str): """Validate if the message matches the expected label. diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 92e31788..eeca890a 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -1,11 +1,11 @@ import concurrent.futures +import contextlib import json import traceback from memos.configs.mem_scheduler import GeneralSchedulerConfig from memos.context.context import ContextThreadPoolExecutor from memos.log import get_logger -from memos.mem_cube.base import BaseMemCube from memos.mem_cube.general import GeneralMemCube from memos.mem_scheduler.base_scheduler import BaseScheduler from memos.mem_scheduler.schemas.general_schemas import ( @@ -17,13 +17,19 @@ PREF_ADD_LABEL, QUERY_LABEL, WORKING_MEMORY_TYPE, + USER_INPUT_TYPE, + NOT_APPLICABLE_TYPE, + LONG_TERM_MEMORY_TYPE, MemCubeID, UserID, ) from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.mem_scheduler.schemas.monitor_schemas import QueryMonitorItem -from memos.mem_scheduler.utils.filter_utils import is_all_chinese, is_all_english -from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube +from memos.mem_scheduler.utils.filter_utils import ( + is_all_chinese, + is_all_english, + transform_name_to_key, +) from memos.memories.textual.item import TextualMemoryItem from memos.memories.textual.preference import PreferenceTextMemory from memos.memories.textual.tree import TreeTextMemory @@ -139,7 +145,7 @@ def long_memory_update_process( label=QUERY_LABEL, user_id=user_id, mem_cube_id=mem_cube_id, - mem_cube=self.mem_cube, + mem_cube=self.current_mem_cube, ) def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: @@ -151,18 +157,40 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: """ logger.info(f"Messages {messages} assigned to {QUERY_LABEL} handler.") - # Process the query in a session turn - grouped_messages = group_messages_by_user_and_mem_cube(messages) + grouped_messages = self.dispatcher._group_messages_by_user_and_mem_cube(messages=messages) self.validate_schedule_messages(messages=messages, label=QUERY_LABEL) for user_id in grouped_messages: for mem_cube_id in grouped_messages[user_id]: - messages = grouped_messages[user_id][mem_cube_id] - if len(messages) == 0: - return + batch = grouped_messages[user_id][mem_cube_id] + if not batch: + continue + try: + for msg in batch: + event = self.create_event_log( + label="addMessage", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=NOT_APPLICABLE_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=[ + { + "content": f"[User] {msg.content}", + "ref_id": msg.item_id, + "role": "user", + } + ], + metadata=[], + memory_len=1, + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + self._submit_web_logs([event]) + except Exception: + logger.exception("Failed to record addMessage log for query") self.long_memory_update_process( - user_id=user_id, mem_cube_id=mem_cube_id, messages=messages + user_id=user_id, mem_cube_id=mem_cube_id, messages=batch ) def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: @@ -173,63 +201,155 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: messages: List of answer messages to process """ logger.info(f"Messages {messages} assigned to {ANSWER_LABEL} handler.") - # Process the query in a session turn - grouped_messages = group_messages_by_user_and_mem_cube(messages) + grouped_messages = self.dispatcher._group_messages_by_user_and_mem_cube(messages=messages) self.validate_schedule_messages(messages=messages, label=ANSWER_LABEL) for user_id in grouped_messages: for mem_cube_id in grouped_messages[user_id]: - messages = grouped_messages[user_id][mem_cube_id] - if len(messages) == 0: - return + batch = grouped_messages[user_id][mem_cube_id] + if not batch: + continue + try: + for msg in batch: + event = self.create_event_log( + label="addMessage", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=NOT_APPLICABLE_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=[ + { + "content": f"[Assistant] {msg.content}", + "ref_id": msg.item_id, + "role": "assistant", + } + ], + metadata=[], + memory_len=1, + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + self._submit_web_logs([event]) + except Exception: + logger.exception("Failed to record addMessage log for answer") def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.") # Process the query in a session turn - grouped_messages = group_messages_by_user_and_mem_cube(messages) - mem_cube = self.mem_cube + grouped_messages = self.dispatcher._group_messages_by_user_and_mem_cube(messages=messages) self.validate_schedule_messages(messages=messages, label=ADD_LABEL) try: for user_id in grouped_messages: for mem_cube_id in grouped_messages[user_id]: - messages = grouped_messages[user_id][mem_cube_id] - if len(messages) == 0: - return + batch = grouped_messages[user_id][mem_cube_id] + if not batch: + continue - # submit logs - for msg in messages: + for msg in batch: try: userinput_memory_ids = json.loads(msg.content) except Exception as e: logger.error(f"Error: {e}. Content: {msg.content}", exc_info=True) userinput_memory_ids = [] + mem_items: list[TextualMemoryItem] = [] for memory_id in userinput_memory_ids: try: - mem_item: TextualMemoryItem = mem_cube.text_mem.get( + mem_item: TextualMemoryItem = self.current_mem_cube.text_mem.get( memory_id=memory_id ) + mem_items.append(mem_item) except Exception: logger.warning( f"This MemoryItem {memory_id} has already been deleted." ) continue - mem_type = mem_item.metadata.memory_type - mem_content = mem_item.memory - - if mem_type == WORKING_MEMORY_TYPE: + add_content: list[dict] = [] + add_meta: list[dict] = [] + update_content: list[dict] = [] + update_meta: list[dict] = [] + for mem_item in mem_items: + if mem_item.metadata.memory_type == WORKING_MEMORY_TYPE: continue - - self.log_adding_memory( - memory=mem_content, - memory_type=mem_type, - user_id=msg.user_id, - mem_cube_id=msg.mem_cube_id, - mem_cube=self.mem_cube, - log_func_callback=self._submit_web_logs, + key = getattr(mem_item.metadata, "key", None) or transform_name_to_key( + name=mem_item.memory ) + exists = False + try: + text_mem = self.current_mem_cube.text_mem + if key and hasattr(text_mem, "graph_store"): + candidates = text_mem.graph_store.get_by_metadata( + [ + {"field": "memory", "op": "=", "value": key}, + { + "field": "memory_type", + "op": "=", + "value": mem_item.metadata.memory_type, + }, + ] + ) + exists = bool(candidates) + except Exception: + exists = False + + payload = { + "content": f"{key}: {mem_item.memory}", + "ref_id": mem_item.id, + } + meta_dict = { + "ref_id": mem_item.id, + "id": mem_item.id, + "key": mem_item.metadata.key, + "memory": mem_item.memory, + "memory_type": mem_item.metadata.memory_type, + "status": mem_item.metadata.status, + "confidence": mem_item.metadata.confidence, + "tags": mem_item.metadata.tags, + "updated_at": getattr(mem_item.metadata, "updated_at", None) + or getattr(mem_item.metadata, "update_at", None), + } + if exists: + update_content.append(payload) + update_meta.append(meta_dict) + else: + add_content.append(payload) + add_meta.append(meta_dict) + + events = [] + if add_content: + events.append( + self.create_event_log( + label="addMemory", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=add_content, + metadata=add_meta, + memory_len=len(add_content), + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + ) + if update_content: + events.append( + self.create_event_log( + label="updateMemory", + from_memory_type=LONG_TERM_MEMORY_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=update_content, + metadata=update_meta, + memory_len=len(update_content), + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + ) + if events: + self._submit_web_logs(events) except Exception as e: logger.error(f"Error: {e}", exc_info=True) @@ -241,7 +361,7 @@ def process_message(message: ScheduleMessageItem): try: user_id = message.user_id mem_cube_id = message.mem_cube_id - mem_cube = self.mem_cube + mem_cube = self.current_mem_cube content = message.content user_name = message.user_name @@ -402,7 +522,7 @@ def process_message(message: ScheduleMessageItem): try: user_id = message.user_id mem_cube_id = message.mem_cube_id - mem_cube = self.mem_cube + mem_cube = self.current_mem_cube content = message.content user_name = message.user_name @@ -431,6 +551,129 @@ def process_message(message: ScheduleMessageItem): user_name=user_name, ) + with contextlib.suppress(Exception): + mem_items: list[TextualMemoryItem] = [] + for mid in mem_ids: + with contextlib.suppress(Exception): + mem_items.append(text_mem.get(mid)) + if len(mem_items) > 1: + keys: list[str] = [] + memcube_content: list[dict] = [] + meta: list[dict] = [] + merged_target_ids: set[str] = set() + with contextlib.suppress(Exception): + if hasattr(text_mem, "graph_store"): + for mid in mem_ids: + edges = text_mem.graph_store.get_edges( + mid, type="MERGED_TO", direction="OUT" + ) + for edge in edges: + target = ( + edge.get("to") or edge.get("dst") or edge.get("target") + ) + if target: + merged_target_ids.add(target) + for item in mem_items: + key = getattr( + getattr(item, "metadata", {}), "key", None + ) or transform_name_to_key(getattr(item, "memory", "")) + keys.append(key) + memcube_content.append( + {"content": key or "(no key)", "ref_id": item.id, "type": "merged"} + ) + meta.append( + { + "ref_id": item.id, + "id": item.id, + "key": key, + "memory": item.memory, + "memory_type": item.metadata.memory_type, + "status": item.metadata.status, + "confidence": item.metadata.confidence, + "tags": item.metadata.tags, + "updated_at": getattr(item.metadata, "updated_at", None) + or getattr(item.metadata, "update_at", None), + } + ) + combined_key = keys[0] if keys else "" + post_ref_id = None + post_meta = { + "ref_id": None, + "id": None, + "key": None, + "memory": None, + "memory_type": None, + "status": None, + "confidence": None, + "tags": None, + "updated_at": None, + } + if merged_target_ids: + post_ref_id = next(iter(merged_target_ids)) + with contextlib.suppress(Exception): + merged_item = text_mem.get(post_ref_id) + combined_key = ( + getattr(getattr(merged_item, "metadata", {}), "key", None) + or combined_key + ) + post_meta = { + "ref_id": post_ref_id, + "id": post_ref_id, + "key": getattr( + getattr(merged_item, "metadata", {}), "key", None + ), + "memory": getattr(merged_item, "memory", None), + "memory_type": getattr( + getattr(merged_item, "metadata", {}), "memory_type", None + ), + "status": getattr( + getattr(merged_item, "metadata", {}), "status", None + ), + "confidence": getattr( + getattr(merged_item, "metadata", {}), "confidence", None + ), + "tags": getattr( + getattr(merged_item, "metadata", {}), "tags", None + ), + "updated_at": getattr( + getattr(merged_item, "metadata", {}), "updated_at", None + ) + or getattr( + getattr(merged_item, "metadata", {}), "update_at", None + ), + } + if not post_ref_id: + import hashlib + + post_ref_id = f"merge-{hashlib.md5(''.join(sorted(mem_ids)).encode()).hexdigest()}" + post_meta["ref_id"] = post_ref_id + post_meta["id"] = post_ref_id + if not post_meta.get("key"): + post_meta["key"] = combined_key + if not keys: + keys = [item.id for item in mem_items] + memcube_content.append( + { + "content": combined_key if combined_key else "(no key)", + "ref_id": post_ref_id, + "type": "postMerge", + } + ) + meta.append(post_meta) + event = self.create_event_log( + label="mergeMemory", + from_memory_type=LONG_TERM_MEMORY_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + memcube_log_content=memcube_content, + metadata=meta, + memory_len=len(keys), + memcube_name=self._map_memcube_name(mem_cube_id), + ) + self._submit_web_logs([event]) + logger.info( f"Successfully processed mem_read for user_id={user_id}, mem_cube_id={mem_cube_id}" ) @@ -451,7 +694,7 @@ def _process_memories_with_reorganize( mem_ids: list[str], user_id: str, mem_cube_id: str, - mem_cube: BaseMemCube, + mem_cube: GeneralMemCube, text_mem: TreeTextMemory, user_name: str, ) -> None: @@ -503,7 +746,7 @@ def _pref_add_message_consumer(self, messages: list[ScheduleMessageItem]) -> Non def process_message(message: ScheduleMessageItem): try: - mem_cube = self.mem_cube + mem_cube = self.current_mem_cube user_id = message.user_id session_id = message.session_id diff --git a/src/memos/mem_scheduler/schemas/general_schemas.py b/src/memos/mem_scheduler/schemas/general_schemas.py index 8dd51c5b..089a7cc6 100644 --- a/src/memos/mem_scheduler/schemas/general_schemas.py +++ b/src/memos/mem_scheduler/schemas/general_schemas.py @@ -29,6 +29,8 @@ class FineStrategy(str, Enum): ADD_LABEL = "add" MEM_READ_LABEL = "mem_read" MEM_ORGANIZE_LABEL = "mem_organize" +MEM_UPDATE_LABEL = "mem_update" +MEM_ARCHIVE_LABEL = "mem_archive" API_MIX_SEARCH_LABEL = "api_mix_search" PREF_ADD_LABEL = "pref_add" diff --git a/src/memos/mem_scheduler/schemas/message_schemas.py b/src/memos/mem_scheduler/schemas/message_schemas.py index f1d48f3f..d7e94e0e 100644 --- a/src/memos/mem_scheduler/schemas/message_schemas.py +++ b/src/memos/mem_scheduler/schemas/message_schemas.py @@ -133,6 +133,14 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin): default_factory=get_utc_now, description="Timestamp indicating when the log entry was created", ) + memcube_log_content: list[dict] | None = Field( + default=None, description="Structured memcube log content list" + ) + metadata: list[dict] | None = Field( + default=None, description="Structured metadata list for each log item" + ) + memcube_name: str | None = Field(default=None, description="Display name for memcube") + memory_len: int | None = Field(default=None, description="Count of items involved in the event") def debug_info(self) -> dict[str, Any]: """Return structured debug information for logging purposes."""