From 94bcea3abe67b67cbbd11fc93ca1de9f5a21c26d Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Thu, 20 Nov 2025 18:41:13 +0800 Subject: [PATCH 1/5] feat: reapply structured memcube logs --- examples/mem_scheduler/memos_w_scheduler.py | 122 ++++++- src/memos/mem_scheduler/base_scheduler.py | 67 +++- .../general_modules/scheduler_logger.py | 235 ++++++++----- src/memos/mem_scheduler/general_scheduler.py | 313 +++++++++++++++--- .../mem_scheduler/schemas/general_schemas.py | 2 + .../mem_scheduler/schemas/message_schemas.py | 12 + 6 files changed, 610 insertions(+), 141 deletions(-) diff --git a/examples/mem_scheduler/memos_w_scheduler.py b/examples/mem_scheduler/memos_w_scheduler.py index c523a8667..3a4fbbd57 100644 --- a/examples/mem_scheduler/memos_w_scheduler.py +++ b/examples/mem_scheduler/memos_w_scheduler.py @@ -3,23 +3,29 @@ from pathlib import Path from queue import Queue -from typing import TYPE_CHECKING - from memos.configs.mem_cube import GeneralMemCubeConfig from memos.configs.mem_os import MOSConfig +from datetime import datetime +import re + from memos.configs.mem_scheduler import AuthConfig from memos.log import get_logger from memos.mem_cube.general import GeneralMemCube from memos.mem_os.main import MOS +from memos.mem_scheduler.schemas.general_schemas import ( + QUERY_LABEL, + ANSWER_LABEL, + ADD_LABEL, + MEM_ORGANIZE_LABEL, + MEM_UPDATE_LABEL, + MEM_ARCHIVE_LABEL, + NOT_APPLICABLE_TYPE, +) +from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem +from memos.mem_scheduler.utils.filter_utils import transform_name_to_key from memos.mem_scheduler.general_scheduler import GeneralScheduler -if TYPE_CHECKING: - from memos.mem_scheduler.schemas.message_schemas import ( - ScheduleLogForWebItem, - ) - - FILE_PATH = Path(__file__).absolute() BASE_DIR = FILE_PATH.parent.parent.parent sys.path.insert(0, str(BASE_DIR)) # Enable execution from any working directory @@ -70,6 +76,91 @@ def init_task(): return conversations, questions +def _truncate_with_rules(text: str) -> str: + has_cjk = bool(re.search(r"[\u4e00-\u9fff]", text)) + limit = 32 if has_cjk else 64 + normalized = text.strip().replace("\n", " ") + if len(normalized) <= limit: + return normalized + return normalized[: limit - 1] + "… 查看详情" + + +def _format_title(ts: datetime, title_text: str) -> str: + return f"{ts.astimezone().strftime('%H:%M:%S')} {title_text}" + + +def _cube_display_from(mem_cube_id: str) -> str: + if "public" in (mem_cube_id or "").lower(): + return "PublicMemCube" + return "UserMemCube" + + +_TYPE_ZH = { + "LongTermMemory": "长期", + "UserMemory": "用户", + "WorkingMemory": "工作", + "ActivationMemory": "激活", + "ParameterMemory": "参数", + "TextMemory": "明文", + "UserInput": "消息", + "NotApplicable": "NA", +} + + +def _format_entry(item: ScheduleLogForWebItem) -> tuple[str, str]: + cube_display = getattr(item, "memcube_name", None) or _cube_display_from(item.mem_cube_id) + label = item.label + content = item.log_content or "" + memcube_content = getattr(item, "memcube_log_content", None) or [] + memory_len = getattr(item, "memory_len", None) or len(memcube_content) or 1 + + def _first_content() -> str: + if memcube_content: + return memcube_content[0].get("content", "") or content + return content + + if label in ("addMessage", QUERY_LABEL, ANSWER_LABEL): + target_cube = cube_display.replace("MemCube", "") + title = _format_title(item.timestamp, f"addMessages至{target_cube} MemCube") + return title, _truncate_with_rules(_first_content()) + + if label in ("addMemory", ADD_LABEL): + title = _format_title(item.timestamp, f"{cube_display}新增了{memory_len}条记忆") + return title, _truncate_with_rules(_first_content()) + + if label in ("updateMemory", MEM_UPDATE_LABEL): + title = _format_title(item.timestamp, f"{cube_display}更新了{memory_len}条记忆") + return title, _truncate_with_rules(_first_content()) + + if label in ("archiveMemory", MEM_ARCHIVE_LABEL): + title = _format_title(item.timestamp, f"{cube_display}归档了{memory_len}条记忆") + return title, _truncate_with_rules(_first_content()) + + if label in ("mergeMemory", MEM_ORGANIZE_LABEL): + title = _format_title(item.timestamp, f"{cube_display}合并了{memory_len}条记忆") + merged = [c for c in memcube_content if c.get("type") == "merged"] + post = [c for c in memcube_content if c.get("type") == "postMerge"] + parts = [] + if merged: + parts.append("被合并的记忆: " + " | ".join(c.get("content", "") for c in merged)) + if post: + parts.append("合并后的记忆: " + " | ".join(c.get("content", "") for c in post)) + detail = " ".join(parts) if parts else _first_content() + return title, _truncate_with_rules(detail) + + if label == "scheduleMemory": + title = _format_title(item.timestamp, f"{cube_display}调度了{memory_len}条记忆") + if memcube_content: + return title, _truncate_with_rules(memcube_content[0].get("content", "")) + key = transform_name_to_key(content) + from_zh = _TYPE_ZH.get(item.from_memory_type, item.from_memory_type) + to_zh = _TYPE_ZH.get(item.to_memory_type, item.to_memory_type) + return title, _truncate_with_rules(f"[{from_zh}→{to_zh}] {key}:{content}") + + title = _format_title(item.timestamp, f"{cube_display}事件") + return title, _truncate_with_rules(_first_content()) + + def show_web_logs(mem_scheduler: GeneralScheduler): """Display all web log entries from the scheduler's log queue. @@ -84,24 +175,25 @@ def show_web_logs(mem_scheduler: GeneralScheduler): # Create a temporary queue to preserve the original queue contents temp_queue = Queue() - log_count = 0 + collected: list[ScheduleLogForWebItem] = [] while not mem_scheduler._web_log_message_queue.empty(): log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get() + collected.append(log_item) temp_queue.put(log_item) - log_count += 1 - - # Print log entry details - print(f"\nLog Entry #{log_count}:") - print(f'- "{log_item.label}" log: {log_item}') + for idx, log_item in enumerate(sorted(collected, key=lambda x: x.timestamp, reverse=True), 1): + title, content = _format_entry(log_item) + print(f"\nLog Entry #{idx}:") + print(title) + print(content) print("-" * 50) # Restore items back to the original queue while not temp_queue.empty(): mem_scheduler._web_log_message_queue.put(temp_queue.get()) - print(f"\nTotal {log_count} web log entries displayed.") + print(f"\nTotal {len(collected)} web log entries displayed.") print("=" * 110 + "\n") diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 6ad7f5cdd..7cdaabadf 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -566,20 +566,69 @@ def _submit_web_logs( def get_web_log_messages(self) -> list[dict]: """ - Retrieves all web log messages from the queue and returns them as a list of JSON-serializable dictionaries. - - Returns: - List[dict]: A list of dictionaries representing ScheduleLogForWebItem objects, - ready for JSON serialization. The list is ordered from oldest to newest. + Retrieve structured log messages from the queue and return JSON-serializable dicts. """ - messages = [] + raw_items: list[ScheduleLogForWebItem] = [] while True: try: - item = self._web_log_message_queue.get_nowait() # Thread-safe get - messages.append(item.to_dict()) + raw_items.append(self._web_log_message_queue.get_nowait()) except Exception: break - return messages + + def _map_label(label: str) -> str: + from memos.mem_scheduler.schemas.general_schemas import ( + QUERY_LABEL, + ANSWER_LABEL, + ADD_LABEL, + MEM_UPDATE_LABEL, + MEM_ORGANIZE_LABEL, + MEM_ARCHIVE_LABEL, + ) + + mapping = { + QUERY_LABEL: "addMessage", + ANSWER_LABEL: "addMessage", + ADD_LABEL: "addMemory", + MEM_UPDATE_LABEL: "updateMemory", + MEM_ORGANIZE_LABEL: "mergeMemory", + MEM_ARCHIVE_LABEL: "archiveMemory", + } + return mapping.get(label, label) + + def _normalize_item(item: ScheduleLogForWebItem) -> dict: + data = item.to_dict() + data["label"] = _map_label(data.get("label")) + memcube_content = getattr(item, "memcube_log_content", None) or [] + metadata = getattr(item, "metadata", None) or [] + + memcube_name = getattr(item, "memcube_name", None) + if not memcube_name and hasattr(self, "_map_memcube_name"): + memcube_name = self._map_memcube_name(item.mem_cube_id) + data["memcube_name"] = memcube_name + + memory_len = getattr(item, "memory_len", None) + if memory_len is None: + if data["label"] == "mergeMemory": + memory_len = len([c for c in memcube_content if c.get("type") != "postMerge"]) + elif memcube_content: + memory_len = len(memcube_content) + else: + memory_len = 1 if item.log_content else 0 + + data["memcube_log_content"] = memcube_content + data["memory_len"] = memory_len + + def _with_memory_time(meta: dict) -> dict: + enriched = dict(meta) + if "memory_time" not in enriched: + enriched["memory_time"] = enriched.get("updated_at") or enriched.get("update_at") + return enriched + + data["metadata"] = [_with_memory_time(m) for m in metadata] + data["log_title"] = "" + return data + + return [_normalize_item(it) for it in raw_items] def _message_consumer(self) -> None: """ diff --git a/src/memos/mem_scheduler/general_modules/scheduler_logger.py b/src/memos/mem_scheduler/general_modules/scheduler_logger.py index d35a4f106..de66b6f38 100644 --- a/src/memos/mem_scheduler/general_modules/scheduler_logger.py +++ b/src/memos/mem_scheduler/general_modules/scheduler_logger.py @@ -1,18 +1,18 @@ from collections.abc import Callable from memos.log import get_logger -from memos.mem_cube.base import BaseMemCube +from memos.mem_cube.general import GeneralMemCube from memos.mem_scheduler.general_modules.base import BaseSchedulerModule from memos.mem_scheduler.schemas.general_schemas import ( ACTIVATION_MEMORY_TYPE, ADD_LABEL, - LONG_TERM_MEMORY_TYPE, NOT_INITIALIZED, PARAMETER_MEMORY_TYPE, - QUERY_LABEL, TEXT_MEMORY_TYPE, USER_INPUT_TYPE, WORKING_MEMORY_TYPE, + MEM_UPDATE_LABEL, + MEM_ARCHIVE_LABEL, ) from memos.mem_scheduler.schemas.message_schemas import ( ScheduleLogForWebItem, @@ -23,6 +23,7 @@ ) from memos.mem_scheduler.utils.misc_utils import log_exceptions from memos.memories.textual.tree import TextualMemoryItem, TreeTextMemory +import hashlib logger = get_logger(__name__) @@ -44,8 +45,8 @@ def create_autofilled_log_item( to_memory_type: str, user_id: str, mem_cube_id: str, - mem_cube: BaseMemCube, - ) -> ScheduleLogForWebItem: + mem_cube: GeneralMemCube, + ) -> ScheduleLogForWebItem: text_mem_base: TreeTextMemory = mem_cube.text_mem current_memory_sizes = text_mem_base.get_current_memory_size() current_memory_sizes = { @@ -98,6 +99,41 @@ def create_autofilled_log_item( ) return log_message + @log_exceptions(logger=logger) + def create_event_log( + self, + label: str, + from_memory_type: str, + to_memory_type: str, + user_id: str, + mem_cube_id: str, + mem_cube: GeneralMemCube, + memcube_log_content: list[dict], + metadata: list[dict], + memory_len: int, + memcube_name: str | None = None, + ) -> ScheduleLogForWebItem: + item = self.create_autofilled_log_item( + log_content="", + label=label, + from_memory_type=from_memory_type, + to_memory_type=to_memory_type, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + ) + item.memcube_log_content = memcube_log_content + item.metadata = metadata + item.memory_len = memory_len + item.memcube_name = memcube_name or self._map_memcube_name(mem_cube_id) + return item + + def _map_memcube_name(self, mem_cube_id: str) -> str: + x = mem_cube_id or "" + if "public" in x.lower(): + return "PublicMemCube" + return "UserMemCube" + # TODO: Log output count is incorrect @log_exceptions(logger=logger) def log_working_memory_replacement( @@ -106,54 +142,54 @@ def log_working_memory_replacement( new_memory: list[TextualMemoryItem], user_id: str, mem_cube_id: str, - mem_cube: BaseMemCube, + mem_cube: GeneralMemCube, log_func_callback: Callable[[list[ScheduleLogForWebItem]], None], ): """Log changes when working memory is replaced.""" - memory_type_map = { - transform_name_to_key(name=m.memory): m.metadata.memory_type - for m in original_memory + new_memory - } - original_text_memories = [m.memory for m in original_memory] new_text_memories = [m.memory for m in new_memory] - - # Convert to sets for efficient difference operations original_set = set(original_text_memories) new_set = set(new_text_memories) - - # Identify changes - added_memories = list(new_set - original_set) # Present in new but not original - - # recording messages - log_messages = [] - for memory in added_memories: - normalized_mem = transform_name_to_key(name=memory) - if normalized_mem not in memory_type_map: - logger.error(f"Memory text not found in type mapping: {memory[:50]}...") - # Get the memory type from the map, default to LONG_TERM_MEMORY_TYPE if not found - mem_type = memory_type_map.get(normalized_mem, LONG_TERM_MEMORY_TYPE) - - if mem_type == WORKING_MEMORY_TYPE: - logger.warning(f"Memory already in working memory: {memory[:50]}...") + added_texts = list(new_set - original_set) + memcube_content = [] + meta = [] + by_text = {m.memory: m for m in new_memory} + for t in added_texts: + itm = by_text.get(t) + if not itm: continue - - log_message = self.create_autofilled_log_item( - log_content=memory, - label=QUERY_LABEL, - from_memory_type=mem_type, - to_memory_type=WORKING_MEMORY_TYPE, - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=mem_cube, + key_name = getattr(itm.metadata, "key", None) or itm.memory + k = transform_name_to_key(name=key_name) + memcube_content.append( + {"content": f"[{itm.metadata.memory_type}→{WORKING_MEMORY_TYPE}] {k}:{itm.memory}", "ref_id": itm.id} ) - log_messages.append(log_message) - - logger.info( - f"{len(added_memories)} {LONG_TERM_MEMORY_TYPE} memorie(s) " - f"transformed to {WORKING_MEMORY_TYPE} memories." + meta.append( + { + "ref_id": itm.id, + "id": itm.id, + "key": itm.metadata.key, + "memory": itm.memory, + "memory_type": itm.metadata.memory_type, + "status": itm.metadata.status, + "confidence": itm.metadata.confidence, + "tags": itm.metadata.tags, + "updated_at": getattr(itm.metadata, "updated_at", None) + or getattr(itm.metadata, "update_at", None), + } + ) + ev = self.create_event_log( + label="scheduleMemory", + from_memory_type=TEXT_MEMORY_TYPE, + to_memory_type=WORKING_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + memcube_log_content=memcube_content, + metadata=meta, + memory_len=len(memcube_content), + memcube_name=self._map_memcube_name(mem_cube_id), ) - log_func_callback(log_messages) + log_func_callback([ev]) @log_exceptions(logger=logger) def log_activation_memory_update( @@ -163,49 +199,48 @@ def log_activation_memory_update( label: str, user_id: str, mem_cube_id: str, - mem_cube: BaseMemCube, + mem_cube: GeneralMemCube, log_func_callback: Callable[[list[ScheduleLogForWebItem]], None], ): """Log changes when activation memory is updated.""" original_set = set(original_text_memories) new_set = set(new_text_memories) - # Identify changes - added_memories = list(new_set - original_set) # Present in new but not original - - # recording messages - log_messages = [] + added_memories = list(new_set - original_set) + memcube_content = [] + meta = [] for mem in added_memories: - log_message_a = self.create_autofilled_log_item( - log_content=mem, - label=label, - from_memory_type=TEXT_MEMORY_TYPE, - to_memory_type=ACTIVATION_MEMORY_TYPE, - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=mem_cube, + key = transform_name_to_key(mem) + ref_id = f"actparam-{hashlib.md5(mem.encode()).hexdigest()}" + memcube_content.append( + {"content": f"[{ACTIVATION_MEMORY_TYPE}→{PARAMETER_MEMORY_TYPE}] {key}:{mem}", "ref_id": ref_id} ) - logger.info( - f"{len(added_memories)} {TEXT_MEMORY_TYPE} memorie(s) " - f"transformed to {ACTIVATION_MEMORY_TYPE} memories." + meta.append( + { + "ref_id": ref_id, + "id": ref_id, + "key": key, + "memory": mem, + "memory_type": ACTIVATION_MEMORY_TYPE, + "status": None, + "confidence": None, + "tags": None, + "updated_at": None, + } ) - - log_message_b = self.create_autofilled_log_item( - log_content=mem, - label=label, - from_memory_type=ACTIVATION_MEMORY_TYPE, - to_memory_type=PARAMETER_MEMORY_TYPE, - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=mem_cube, - ) - - log_messages.extend([log_message_a, log_message_b]) - logger.info( - f"{len(added_memories)} {ACTIVATION_MEMORY_TYPE} memorie(s) " - f"transformed to {PARAMETER_MEMORY_TYPE} memories." + ev = self.create_event_log( + label="scheduleMemory", + from_memory_type=ACTIVATION_MEMORY_TYPE, + to_memory_type=PARAMETER_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + memcube_log_content=memcube_content, + metadata=meta, + memory_len=len(added_memories), + memcube_name=self._map_memcube_name(mem_cube_id), ) - log_func_callback(log_messages) + log_func_callback([ev]) @log_exceptions(logger=logger) def log_adding_memory( @@ -214,10 +249,10 @@ def log_adding_memory( memory_type: str, user_id: str, mem_cube_id: str, - mem_cube: BaseMemCube, + mem_cube: GeneralMemCube, log_func_callback: Callable[[list[ScheduleLogForWebItem]], None], ): - """Log changes when working memory is replaced.""" + """Deprecated: legacy text log. Use create_event_log with structured fields instead.""" log_message = self.create_autofilled_log_item( log_content=memory, label=ADD_LABEL, @@ -233,6 +268,50 @@ def log_adding_memory( f"converted to {memory_type} memory in mem_cube {mem_cube_id}: {memory}" ) + @log_exceptions(logger=logger) + def log_updating_memory( + self, + memory: str, + memory_type: str, + user_id: str, + mem_cube_id: str, + mem_cube: GeneralMemCube, + log_func_callback: Callable[[list[ScheduleLogForWebItem]], None], + ): + """Deprecated: legacy text log. Use create_event_log with structured fields instead.""" + log_message = self.create_autofilled_log_item( + log_content=memory, + label=MEM_UPDATE_LABEL, + from_memory_type=memory_type, + to_memory_type=memory_type, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + ) + log_func_callback([log_message]) + + @log_exceptions(logger=logger) + def log_archiving_memory( + self, + memory: str, + memory_type: str, + user_id: str, + mem_cube_id: str, + mem_cube: GeneralMemCube, + log_func_callback: Callable[[list[ScheduleLogForWebItem]], None], + ): + """Deprecated: legacy text log. Use create_event_log with structured fields instead.""" + log_message = self.create_autofilled_log_item( + log_content=memory, + label=MEM_ARCHIVE_LABEL, + from_memory_type=memory_type, + to_memory_type=memory_type, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + ) + log_func_callback([log_message]) + @log_exceptions(logger=logger) def validate_schedule_message(self, message: ScheduleMessageItem, label: str): """Validate if the message matches the expected label. diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 92e317881..9cc49b9e5 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -1,11 +1,11 @@ import concurrent.futures +import contextlib import json import traceback from memos.configs.mem_scheduler import GeneralSchedulerConfig from memos.context.context import ContextThreadPoolExecutor from memos.log import get_logger -from memos.mem_cube.base import BaseMemCube from memos.mem_cube.general import GeneralMemCube from memos.mem_scheduler.base_scheduler import BaseScheduler from memos.mem_scheduler.schemas.general_schemas import ( @@ -17,13 +17,19 @@ PREF_ADD_LABEL, QUERY_LABEL, WORKING_MEMORY_TYPE, + USER_INPUT_TYPE, + NOT_APPLICABLE_TYPE, + LONG_TERM_MEMORY_TYPE, MemCubeID, UserID, ) from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.mem_scheduler.schemas.monitor_schemas import QueryMonitorItem -from memos.mem_scheduler.utils.filter_utils import is_all_chinese, is_all_english -from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube +from memos.mem_scheduler.utils.filter_utils import ( + is_all_chinese, + is_all_english, + transform_name_to_key, +) from memos.memories.textual.item import TextualMemoryItem from memos.memories.textual.preference import PreferenceTextMemory from memos.memories.textual.tree import TreeTextMemory @@ -139,7 +145,7 @@ def long_memory_update_process( label=QUERY_LABEL, user_id=user_id, mem_cube_id=mem_cube_id, - mem_cube=self.mem_cube, + mem_cube=self.current_mem_cube, ) def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: @@ -151,18 +157,40 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: """ logger.info(f"Messages {messages} assigned to {QUERY_LABEL} handler.") - # Process the query in a session turn - grouped_messages = group_messages_by_user_and_mem_cube(messages) + grouped_messages = self.dispatcher._group_messages_by_user_and_mem_cube(messages=messages) self.validate_schedule_messages(messages=messages, label=QUERY_LABEL) for user_id in grouped_messages: for mem_cube_id in grouped_messages[user_id]: - messages = grouped_messages[user_id][mem_cube_id] - if len(messages) == 0: - return + batch = grouped_messages[user_id][mem_cube_id] + if not batch: + continue + try: + for msg in batch: + event = self.create_event_log( + label="addMessage", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=NOT_APPLICABLE_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=[ + { + "content": f"[User] {msg.content}", + "ref_id": msg.item_id, + "role": "user", + } + ], + metadata=[], + memory_len=1, + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + self._submit_web_logs([event]) + except Exception: + logger.exception("Failed to record addMessage log for query") self.long_memory_update_process( - user_id=user_id, mem_cube_id=mem_cube_id, messages=messages + user_id=user_id, mem_cube_id=mem_cube_id, messages=batch ) def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: @@ -173,63 +201,152 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: messages: List of answer messages to process """ logger.info(f"Messages {messages} assigned to {ANSWER_LABEL} handler.") - # Process the query in a session turn - grouped_messages = group_messages_by_user_and_mem_cube(messages) + grouped_messages = self.dispatcher._group_messages_by_user_and_mem_cube(messages=messages) self.validate_schedule_messages(messages=messages, label=ANSWER_LABEL) for user_id in grouped_messages: for mem_cube_id in grouped_messages[user_id]: - messages = grouped_messages[user_id][mem_cube_id] - if len(messages) == 0: - return + batch = grouped_messages[user_id][mem_cube_id] + if not batch: + continue + try: + for msg in batch: + event = self.create_event_log( + label="addMessage", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=NOT_APPLICABLE_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=[ + { + "content": f"[Assistant] {msg.content}", + "ref_id": msg.item_id, + "role": "assistant", + } + ], + metadata=[], + memory_len=1, + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + self._submit_web_logs([event]) + except Exception: + logger.exception("Failed to record addMessage log for answer") def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.") # Process the query in a session turn - grouped_messages = group_messages_by_user_and_mem_cube(messages) - mem_cube = self.mem_cube + grouped_messages = self.dispatcher._group_messages_by_user_and_mem_cube(messages=messages) self.validate_schedule_messages(messages=messages, label=ADD_LABEL) try: for user_id in grouped_messages: for mem_cube_id in grouped_messages[user_id]: - messages = grouped_messages[user_id][mem_cube_id] - if len(messages) == 0: - return + batch = grouped_messages[user_id][mem_cube_id] + if not batch: + continue - # submit logs - for msg in messages: + for msg in batch: try: userinput_memory_ids = json.loads(msg.content) except Exception as e: logger.error(f"Error: {e}. Content: {msg.content}", exc_info=True) userinput_memory_ids = [] + mem_items: list[TextualMemoryItem] = [] for memory_id in userinput_memory_ids: try: - mem_item: TextualMemoryItem = mem_cube.text_mem.get( + mem_item: TextualMemoryItem = self.current_mem_cube.text_mem.get( memory_id=memory_id ) + mem_items.append(mem_item) except Exception: logger.warning( f"This MemoryItem {memory_id} has already been deleted." ) continue - mem_type = mem_item.metadata.memory_type - mem_content = mem_item.memory - - if mem_type == WORKING_MEMORY_TYPE: + add_content: list[dict] = [] + add_meta: list[dict] = [] + update_content: list[dict] = [] + update_meta: list[dict] = [] + for mem_item in mem_items: + if mem_item.metadata.memory_type == WORKING_MEMORY_TYPE: continue - - self.log_adding_memory( - memory=mem_content, - memory_type=mem_type, - user_id=msg.user_id, - mem_cube_id=msg.mem_cube_id, - mem_cube=self.mem_cube, - log_func_callback=self._submit_web_logs, + key = getattr(mem_item.metadata, "key", None) or transform_name_to_key( + name=mem_item.memory ) + exists = False + try: + text_mem = self.current_mem_cube.text_mem + if key and hasattr(text_mem, "graph_store"): + candidates = text_mem.graph_store.get_by_metadata( + [ + {"field": "memory", "op": "=", "value": key}, + { + "field": "memory_type", + "op": "=", + "value": mem_item.metadata.memory_type, + }, + ] + ) + exists = bool(candidates) + except Exception: + exists = False + + payload = {"content": f"{key}:{mem_item.memory}", "ref_id": mem_item.id} + meta_dict = { + "ref_id": mem_item.id, + "id": mem_item.id, + "key": mem_item.metadata.key, + "memory": mem_item.memory, + "memory_type": mem_item.metadata.memory_type, + "status": mem_item.metadata.status, + "confidence": mem_item.metadata.confidence, + "tags": mem_item.metadata.tags, + "updated_at": getattr(mem_item.metadata, "updated_at", None) + or getattr(mem_item.metadata, "update_at", None), + } + if exists: + update_content.append(payload) + update_meta.append(meta_dict) + else: + add_content.append(payload) + add_meta.append(meta_dict) + + events = [] + if add_content: + events.append( + self.create_event_log( + label="addMemory", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=add_content, + metadata=add_meta, + memory_len=len(add_content), + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + ) + if update_content: + events.append( + self.create_event_log( + label="updateMemory", + from_memory_type=LONG_TERM_MEMORY_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=update_content, + metadata=update_meta, + memory_len=len(update_content), + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + ) + if events: + self._submit_web_logs(events) except Exception as e: logger.error(f"Error: {e}", exc_info=True) @@ -241,7 +358,7 @@ def process_message(message: ScheduleMessageItem): try: user_id = message.user_id mem_cube_id = message.mem_cube_id - mem_cube = self.mem_cube + mem_cube = self.current_mem_cube content = message.content user_name = message.user_name @@ -402,7 +519,7 @@ def process_message(message: ScheduleMessageItem): try: user_id = message.user_id mem_cube_id = message.mem_cube_id - mem_cube = self.mem_cube + mem_cube = self.current_mem_cube content = message.content user_name = message.user_name @@ -431,6 +548,124 @@ def process_message(message: ScheduleMessageItem): user_name=user_name, ) + with contextlib.suppress(Exception): + mem_items: list[TextualMemoryItem] = [] + for mid in mem_ids: + with contextlib.suppress(Exception): + mem_items.append(text_mem.get(mid)) + if len(mem_items) > 1: + keys: list[str] = [] + memcube_content: list[dict] = [] + meta: list[dict] = [] + merged_target_ids: set[str] = set() + with contextlib.suppress(Exception): + if hasattr(text_mem, "graph_store"): + for mid in mem_ids: + edges = text_mem.graph_store.get_edges( + mid, type="MERGED_TO", direction="OUT" + ) + for edge in edges: + target = edge.get("to") or edge.get("dst") or edge.get("target") + if target: + merged_target_ids.add(target) + for item in mem_items: + key = getattr(getattr(item, "metadata", {}), "key", None) or transform_name_to_key( + getattr(item, "memory", "") + ) + keys.append(key) + memcube_content.append( + {"content": key or "(no key)", "ref_id": item.id, "type": "merged"} + ) + meta.append( + { + "ref_id": item.id, + "id": item.id, + "key": key, + "memory": item.memory, + "memory_type": item.metadata.memory_type, + "status": item.metadata.status, + "confidence": item.metadata.confidence, + "tags": item.metadata.tags, + "updated_at": getattr(item.metadata, "updated_at", None) + or getattr(item.metadata, "update_at", None), + } + ) + combined_key = keys[0] if keys else "" + post_ref_id = None + post_meta = { + "ref_id": None, + "id": None, + "key": None, + "memory": None, + "memory_type": None, + "status": None, + "confidence": None, + "tags": None, + "updated_at": None, + } + if merged_target_ids: + post_ref_id = list(merged_target_ids)[0] + with contextlib.suppress(Exception): + merged_item = text_mem.get(post_ref_id) + combined_key = getattr( + getattr(merged_item, "metadata", {}), "key", None + ) or combined_key + post_meta = { + "ref_id": post_ref_id, + "id": post_ref_id, + "key": getattr( + getattr(merged_item, "metadata", {}), "key", None + ), + "memory": getattr(merged_item, "memory", None), + "memory_type": getattr( + getattr(merged_item, "metadata", {}), "memory_type", None + ), + "status": getattr( + getattr(merged_item, "metadata", {}), "status", None + ), + "confidence": getattr( + getattr(merged_item, "metadata", {}), "confidence", None + ), + "tags": getattr(getattr(merged_item, "metadata", {}), "tags", None), + "updated_at": getattr( + getattr(merged_item, "metadata", {}), "updated_at", None + ) + or getattr( + getattr(merged_item, "metadata", {}), "update_at", None + ), + } + if not post_ref_id: + import hashlib + + post_ref_id = f"merge-{hashlib.md5(''.join(sorted(mem_ids)).encode()).hexdigest()}" + post_meta["ref_id"] = post_ref_id + post_meta["id"] = post_ref_id + if not post_meta.get("key"): + post_meta["key"] = combined_key + if not keys: + keys = [item.id for item in mem_items] + memcube_content.append( + { + "content": combined_key if combined_key else "(no key)", + "ref_id": post_ref_id, + "type": "postMerge", + } + ) + meta.append(post_meta) + event = self.create_event_log( + label="mergeMemory", + from_memory_type=LONG_TERM_MEMORY_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + memcube_log_content=memcube_content, + metadata=meta, + memory_len=len(keys), + memcube_name=self._map_memcube_name(mem_cube_id), + ) + self._submit_web_logs([event]) + logger.info( f"Successfully processed mem_read for user_id={user_id}, mem_cube_id={mem_cube_id}" ) @@ -451,7 +686,7 @@ def _process_memories_with_reorganize( mem_ids: list[str], user_id: str, mem_cube_id: str, - mem_cube: BaseMemCube, + mem_cube: GeneralMemCube, text_mem: TreeTextMemory, user_name: str, ) -> None: @@ -503,7 +738,7 @@ def _pref_add_message_consumer(self, messages: list[ScheduleMessageItem]) -> Non def process_message(message: ScheduleMessageItem): try: - mem_cube = self.mem_cube + mem_cube = self.current_mem_cube user_id = message.user_id session_id = message.session_id diff --git a/src/memos/mem_scheduler/schemas/general_schemas.py b/src/memos/mem_scheduler/schemas/general_schemas.py index 8dd51c5bd..089a7cc6c 100644 --- a/src/memos/mem_scheduler/schemas/general_schemas.py +++ b/src/memos/mem_scheduler/schemas/general_schemas.py @@ -29,6 +29,8 @@ class FineStrategy(str, Enum): ADD_LABEL = "add" MEM_READ_LABEL = "mem_read" MEM_ORGANIZE_LABEL = "mem_organize" +MEM_UPDATE_LABEL = "mem_update" +MEM_ARCHIVE_LABEL = "mem_archive" API_MIX_SEARCH_LABEL = "api_mix_search" PREF_ADD_LABEL = "pref_add" diff --git a/src/memos/mem_scheduler/schemas/message_schemas.py b/src/memos/mem_scheduler/schemas/message_schemas.py index f1d48f3f1..4e030825b 100644 --- a/src/memos/mem_scheduler/schemas/message_schemas.py +++ b/src/memos/mem_scheduler/schemas/message_schemas.py @@ -133,6 +133,18 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin): default_factory=get_utc_now, description="Timestamp indicating when the log entry was created", ) + memcube_log_content: list[dict] | None = Field( + default=None, description="Structured memcube log content list" + ) + metadata: list[dict] | None = Field( + default=None, description="Structured metadata list for each log item" + ) + memcube_name: str | None = Field( + default=None, description="Display name for memcube" + ) + memory_len: int | None = Field( + default=None, description="Count of items involved in the event" + ) def debug_info(self) -> dict[str, Any]: """Return structured debug information for logging purposes.""" From d91eaaada8ccc5480ae415b80c5d9593f9d73cf3 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Thu, 20 Nov 2025 19:57:30 +0800 Subject: [PATCH 2/5] refactor: replace fullwidth punctuation with halfwidth in log content MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace fullwidth colon (:) with halfwidth colon (:) in all content fields - Update example file to use English UI text instead of Chinese for consistency - Ensure backend sends neutral data format for frontend i18n handling Changes: - scheduler_logger.py: Use halfwidth colon in content formatting - general_scheduler.py: Use halfwidth colon in content formatting - memos_w_scheduler.py: Replace Chinese UI text with English equivalents --- examples/mem_scheduler/memos_w_scheduler.py | 42 +++++++++---------- .../general_modules/scheduler_logger.py | 4 +- src/memos/mem_scheduler/general_scheduler.py | 2 +- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/examples/mem_scheduler/memos_w_scheduler.py b/examples/mem_scheduler/memos_w_scheduler.py index 3a4fbbd57..17bfd3993 100644 --- a/examples/mem_scheduler/memos_w_scheduler.py +++ b/examples/mem_scheduler/memos_w_scheduler.py @@ -82,7 +82,7 @@ def _truncate_with_rules(text: str) -> str: normalized = text.strip().replace("\n", " ") if len(normalized) <= limit: return normalized - return normalized[: limit - 1] + "… 查看详情" + return normalized[:limit] + "..." def _format_title(ts: datetime, title_text: str) -> str: @@ -95,14 +95,14 @@ def _cube_display_from(mem_cube_id: str) -> str: return "UserMemCube" -_TYPE_ZH = { - "LongTermMemory": "长期", - "UserMemory": "用户", - "WorkingMemory": "工作", - "ActivationMemory": "激活", - "ParameterMemory": "参数", - "TextMemory": "明文", - "UserInput": "消息", +_TYPE_SHORT = { + "LongTermMemory": "LTM", + "UserMemory": "User", + "WorkingMemory": "Working", + "ActivationMemory": "Activation", + "ParameterMemory": "Parameter", + "TextMemory": "Text", + "UserInput": "Input", "NotApplicable": "NA", } @@ -121,43 +121,43 @@ def _first_content() -> str: if label in ("addMessage", QUERY_LABEL, ANSWER_LABEL): target_cube = cube_display.replace("MemCube", "") - title = _format_title(item.timestamp, f"addMessages至{target_cube} MemCube") + title = _format_title(item.timestamp, f"addMessages to {target_cube} MemCube") return title, _truncate_with_rules(_first_content()) if label in ("addMemory", ADD_LABEL): - title = _format_title(item.timestamp, f"{cube_display}新增了{memory_len}条记忆") + title = _format_title(item.timestamp, f"{cube_display} added {memory_len} memories") return title, _truncate_with_rules(_first_content()) if label in ("updateMemory", MEM_UPDATE_LABEL): - title = _format_title(item.timestamp, f"{cube_display}更新了{memory_len}条记忆") + title = _format_title(item.timestamp, f"{cube_display} updated {memory_len} memories") return title, _truncate_with_rules(_first_content()) if label in ("archiveMemory", MEM_ARCHIVE_LABEL): - title = _format_title(item.timestamp, f"{cube_display}归档了{memory_len}条记忆") + title = _format_title(item.timestamp, f"{cube_display} archived {memory_len} memories") return title, _truncate_with_rules(_first_content()) if label in ("mergeMemory", MEM_ORGANIZE_LABEL): - title = _format_title(item.timestamp, f"{cube_display}合并了{memory_len}条记忆") + title = _format_title(item.timestamp, f"{cube_display} merged {memory_len} memories") merged = [c for c in memcube_content if c.get("type") == "merged"] post = [c for c in memcube_content if c.get("type") == "postMerge"] parts = [] if merged: - parts.append("被合并的记忆: " + " | ".join(c.get("content", "") for c in merged)) + parts.append("Merged: " + " | ".join(c.get("content", "") for c in merged)) if post: - parts.append("合并后的记忆: " + " | ".join(c.get("content", "") for c in post)) + parts.append("Result: " + " | ".join(c.get("content", "") for c in post)) detail = " ".join(parts) if parts else _first_content() return title, _truncate_with_rules(detail) if label == "scheduleMemory": - title = _format_title(item.timestamp, f"{cube_display}调度了{memory_len}条记忆") + title = _format_title(item.timestamp, f"{cube_display} scheduled {memory_len} memories") if memcube_content: return title, _truncate_with_rules(memcube_content[0].get("content", "")) key = transform_name_to_key(content) - from_zh = _TYPE_ZH.get(item.from_memory_type, item.from_memory_type) - to_zh = _TYPE_ZH.get(item.to_memory_type, item.to_memory_type) - return title, _truncate_with_rules(f"[{from_zh}→{to_zh}] {key}:{content}") + from_short = _TYPE_SHORT.get(item.from_memory_type, item.from_memory_type) + to_short = _TYPE_SHORT.get(item.to_memory_type, item.to_memory_type) + return title, _truncate_with_rules(f"[{from_short}→{to_short}] {key}: {content}") - title = _format_title(item.timestamp, f"{cube_display}事件") + title = _format_title(item.timestamp, f"{cube_display} event") return title, _truncate_with_rules(_first_content()) diff --git a/src/memos/mem_scheduler/general_modules/scheduler_logger.py b/src/memos/mem_scheduler/general_modules/scheduler_logger.py index de66b6f38..ca1543451 100644 --- a/src/memos/mem_scheduler/general_modules/scheduler_logger.py +++ b/src/memos/mem_scheduler/general_modules/scheduler_logger.py @@ -161,7 +161,7 @@ def log_working_memory_replacement( key_name = getattr(itm.metadata, "key", None) or itm.memory k = transform_name_to_key(name=key_name) memcube_content.append( - {"content": f"[{itm.metadata.memory_type}→{WORKING_MEMORY_TYPE}] {k}:{itm.memory}", "ref_id": itm.id} + {"content": f"[{itm.metadata.memory_type}→{WORKING_MEMORY_TYPE}] {k}: {itm.memory}", "ref_id": itm.id} ) meta.append( { @@ -213,7 +213,7 @@ def log_activation_memory_update( key = transform_name_to_key(mem) ref_id = f"actparam-{hashlib.md5(mem.encode()).hexdigest()}" memcube_content.append( - {"content": f"[{ACTIVATION_MEMORY_TYPE}→{PARAMETER_MEMORY_TYPE}] {key}:{mem}", "ref_id": ref_id} + {"content": f"[{ACTIVATION_MEMORY_TYPE}→{PARAMETER_MEMORY_TYPE}] {key}: {mem}", "ref_id": ref_id} ) meta.append( { diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 9cc49b9e5..d274463bd 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -294,7 +294,7 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: except Exception: exists = False - payload = {"content": f"{key}:{mem_item.memory}", "ref_id": mem_item.id} + payload = {"content": f"{key}: {mem_item.memory}", "ref_id": mem_item.id} meta_dict = { "ref_id": mem_item.id, "id": mem_item.id, From 252e49555c15290ad60ba93616d9b5c14234cc6c Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Thu, 20 Nov 2025 20:17:15 +0800 Subject: [PATCH 3/5] style: fix RUF015 linter warning Replace list(merged_target_ids)[0] with next(iter(merged_target_ids)) for better performance and readability. --- src/memos/mem_scheduler/general_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index d274463bd..57895a28d 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -604,7 +604,7 @@ def process_message(message: ScheduleMessageItem): "updated_at": None, } if merged_target_ids: - post_ref_id = list(merged_target_ids)[0] + post_ref_id = next(iter(merged_target_ids)) with contextlib.suppress(Exception): merged_item = text_mem.get(post_ref_id) combined_key = getattr( From 950b36bd9c5706646d2b5acba90748ec08598fc9 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Thu, 20 Nov 2025 20:23:38 +0800 Subject: [PATCH 4/5] style: apply ruff formatting - Format long lines for better readability - Align dictionary entries and function parameters - Follow project code style guidelines --- src/memos/mem_scheduler/base_scheduler.py | 4 ++- .../general_modules/scheduler_logger.py | 12 ++++++--- src/memos/mem_scheduler/general_scheduler.py | 26 ++++++++++++------- .../mem_scheduler/schemas/message_schemas.py | 8 ++---- 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 7cdaabadf..63b87157c 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -621,7 +621,9 @@ def _normalize_item(item: ScheduleLogForWebItem) -> dict: def _with_memory_time(meta: dict) -> dict: enriched = dict(meta) if "memory_time" not in enriched: - enriched["memory_time"] = enriched.get("updated_at") or enriched.get("update_at") + enriched["memory_time"] = enriched.get("updated_at") or enriched.get( + "update_at" + ) return enriched data["metadata"] = [_with_memory_time(m) for m in metadata] diff --git a/src/memos/mem_scheduler/general_modules/scheduler_logger.py b/src/memos/mem_scheduler/general_modules/scheduler_logger.py index ca1543451..3859c9e6f 100644 --- a/src/memos/mem_scheduler/general_modules/scheduler_logger.py +++ b/src/memos/mem_scheduler/general_modules/scheduler_logger.py @@ -46,7 +46,7 @@ def create_autofilled_log_item( user_id: str, mem_cube_id: str, mem_cube: GeneralMemCube, - ) -> ScheduleLogForWebItem: + ) -> ScheduleLogForWebItem: text_mem_base: TreeTextMemory = mem_cube.text_mem current_memory_sizes = text_mem_base.get_current_memory_size() current_memory_sizes = { @@ -161,7 +161,10 @@ def log_working_memory_replacement( key_name = getattr(itm.metadata, "key", None) or itm.memory k = transform_name_to_key(name=key_name) memcube_content.append( - {"content": f"[{itm.metadata.memory_type}→{WORKING_MEMORY_TYPE}] {k}: {itm.memory}", "ref_id": itm.id} + { + "content": f"[{itm.metadata.memory_type}→{WORKING_MEMORY_TYPE}] {k}: {itm.memory}", + "ref_id": itm.id, + } ) meta.append( { @@ -213,7 +216,10 @@ def log_activation_memory_update( key = transform_name_to_key(mem) ref_id = f"actparam-{hashlib.md5(mem.encode()).hexdigest()}" memcube_content.append( - {"content": f"[{ACTIVATION_MEMORY_TYPE}→{PARAMETER_MEMORY_TYPE}] {key}: {mem}", "ref_id": ref_id} + { + "content": f"[{ACTIVATION_MEMORY_TYPE}→{PARAMETER_MEMORY_TYPE}] {key}: {mem}", + "ref_id": ref_id, + } ) meta.append( { diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 57895a28d..eeca890a9 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -294,7 +294,10 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: except Exception: exists = False - payload = {"content": f"{key}: {mem_item.memory}", "ref_id": mem_item.id} + payload = { + "content": f"{key}: {mem_item.memory}", + "ref_id": mem_item.id, + } meta_dict = { "ref_id": mem_item.id, "id": mem_item.id, @@ -565,13 +568,15 @@ def process_message(message: ScheduleMessageItem): mid, type="MERGED_TO", direction="OUT" ) for edge in edges: - target = edge.get("to") or edge.get("dst") or edge.get("target") + target = ( + edge.get("to") or edge.get("dst") or edge.get("target") + ) if target: merged_target_ids.add(target) for item in mem_items: - key = getattr(getattr(item, "metadata", {}), "key", None) or transform_name_to_key( - getattr(item, "memory", "") - ) + key = getattr( + getattr(item, "metadata", {}), "key", None + ) or transform_name_to_key(getattr(item, "memory", "")) keys.append(key) memcube_content.append( {"content": key or "(no key)", "ref_id": item.id, "type": "merged"} @@ -607,9 +612,10 @@ def process_message(message: ScheduleMessageItem): post_ref_id = next(iter(merged_target_ids)) with contextlib.suppress(Exception): merged_item = text_mem.get(post_ref_id) - combined_key = getattr( - getattr(merged_item, "metadata", {}), "key", None - ) or combined_key + combined_key = ( + getattr(getattr(merged_item, "metadata", {}), "key", None) + or combined_key + ) post_meta = { "ref_id": post_ref_id, "id": post_ref_id, @@ -626,7 +632,9 @@ def process_message(message: ScheduleMessageItem): "confidence": getattr( getattr(merged_item, "metadata", {}), "confidence", None ), - "tags": getattr(getattr(merged_item, "metadata", {}), "tags", None), + "tags": getattr( + getattr(merged_item, "metadata", {}), "tags", None + ), "updated_at": getattr( getattr(merged_item, "metadata", {}), "updated_at", None ) diff --git a/src/memos/mem_scheduler/schemas/message_schemas.py b/src/memos/mem_scheduler/schemas/message_schemas.py index 4e030825b..d7e94e0e1 100644 --- a/src/memos/mem_scheduler/schemas/message_schemas.py +++ b/src/memos/mem_scheduler/schemas/message_schemas.py @@ -139,12 +139,8 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin): metadata: list[dict] | None = Field( default=None, description="Structured metadata list for each log item" ) - memcube_name: str | None = Field( - default=None, description="Display name for memcube" - ) - memory_len: int | None = Field( - default=None, description="Count of items involved in the event" - ) + memcube_name: str | None = Field(default=None, description="Display name for memcube") + memory_len: int | None = Field(default=None, description="Count of items involved in the event") def debug_info(self) -> dict[str, Any]: """Return structured debug information for logging purposes.""" From 0e447dfb1d210baeb9648353e3b2fe478b937db3 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Thu, 20 Nov 2025 20:27:25 +0800 Subject: [PATCH 5/5] style: format server_router.py (inherited from dev branch) --- src/memos/api/routers/server_router.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index b02569e60..b3b517305 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -98,6 +98,7 @@ def add_memories(add_req: APIADDRequest): # Scheduler API Endpoints # ============================================================================= + @router.get("/scheduler/status", summary="Get scheduler running status") def scheduler_status(user_name: str | None = None): """Get scheduler running status."""