From 7efec70a9b0b75883555f0e903b5ffff879a1087 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Thu, 27 Nov 2025 16:24:46 +0800 Subject: [PATCH] fix: Make from_memory_type and to_memory_type optional and add task_id propagation - Make from_memory_type and to_memory_type fields optional in ScheduleLogForWebItem - This fixes RabbitMQ log submission validation errors in cloud service scenario - Add task_id field to ScheduleMessageItem and ScheduleLogForWebItem - Propagate task_id from API request through scheduler to web logs - Add logging for preference memory additions in _pref_add_message_consumer Fixes validation error: '2 validation errors for ScheduleLogForWebItem from_memory_type Field required to_memory_type Field required' Changes: - src/memos/mem_scheduler/schemas/message_schemas.py: Add task_id fields - src/memos/multi_mem_cube/single_cube.py: Pass task_id to ScheduleMessageItem - src/memos/mem_scheduler/general_scheduler.py: Propagate task_id to logs --- src/memos/mem_scheduler/general_scheduler.py | 46 +++++++++++++++++++ .../mem_scheduler/schemas/message_schemas.py | 6 ++- src/memos/multi_mem_cube/single_cube.py | 3 ++ 3 files changed, 53 insertions(+), 2 deletions(-) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index d7c3e65f..ac2ea2bf 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -189,6 +189,7 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: memory_len=1, memcube_name=self._map_memcube_name(msg.mem_cube_id), ) + event.task_id = msg.task_id self._submit_web_logs([event]) except Exception: logger.exception("Failed to record addMessage log for query") @@ -233,6 +234,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: memory_len=1, memcube_name=self._map_memcube_name(msg.mem_cube_id), ) + event.task_id = msg.task_id self._submit_web_logs([event]) except Exception: logger.exception("Failed to record addMessage log for answer") @@ -798,6 +800,50 @@ def process_message(message: ScheduleMessageItem): f"Successfully processed and add preferences for user_id={user_id}, mem_cube_id={mem_cube_id}, pref_ids={pref_ids}" ) + # Create and submit log for web display + # Only send logs if RabbitMQ is configured with direct exchange (cloud service scenario) + should_send_log = ( + self.rabbitmq_config is not None + and hasattr(self.rabbitmq_config, "exchange_type") + and self.rabbitmq_config.exchange_type == "direct" + ) + if pref_ids and should_send_log: + pref_content = [] + pref_meta = [] + for i, pref_mem_item in enumerate(pref_memories): + if i < len(pref_ids): + pref_content.append( + { + "content": pref_mem_item.memory, + "ref_id": pref_ids[i], + } + ) + pref_meta.append( + { + "ref_id": pref_ids[i], + "id": pref_ids[i], + "memory": pref_mem_item.memory, + "memory_type": getattr( + pref_mem_item.metadata, "memory_type", "preference" + ), + } + ) + + event = self.create_event_log( + label="addMemory", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + memcube_log_content=pref_content, + metadata=pref_meta, + memory_len=len(pref_content), + memcube_name=self._map_memcube_name(mem_cube_id), + ) + event.task_id = message.task_id + self._submit_web_logs([event]) + except Exception as e: logger.error(f"Error processing pref_add message: {e}", exc_info=True) diff --git a/src/memos/mem_scheduler/schemas/message_schemas.py b/src/memos/mem_scheduler/schemas/message_schemas.py index 2bd6ef1e..2f406e21 100644 --- a/src/memos/mem_scheduler/schemas/message_schemas.py +++ b/src/memos/mem_scheduler/schemas/message_schemas.py @@ -33,6 +33,7 @@ class ScheduleMessageItem(BaseModel, DictConversionMixin): item_id: str = Field(description="uuid", default_factory=lambda: str(uuid4())) + task_id: str | None = Field(default=None, description="Parent task ID, if applicable") redis_message_id: str = Field(default="", description="the message get from redis stream") user_id: str = Field(..., description="user id") mem_cube_id: str = Field(..., description="memcube id") @@ -114,13 +115,14 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin): item_id: str = Field( description="Unique identifier for the log entry", default_factory=lambda: str(uuid4()) ) + task_id: str | None = Field(default=None, description="Identifier for the parent task") user_id: str = Field(..., description="Identifier for the user associated with the log") mem_cube_id: str = Field( ..., description="Identifier for the memcube associated with this log entry" ) label: str = Field(..., description="Label categorizing the type of log") - from_memory_type: str = Field(..., description="Source memory type") - to_memory_type: str = Field(..., description="Destination memory type") + from_memory_type: str | None = Field(None, description="Source memory type") + to_memory_type: str | None = Field(None, description="Destination memory type") log_content: str = Field(..., description="Detailed content of the log entry") current_memory_sizes: MemorySizes = Field( default_factory=lambda: dict(DEFAULT_MEMORY_SIZES), diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 8e37cb92..2b79a416 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -427,6 +427,7 @@ def _schedule_memory_tasks( try: message_item_read = ScheduleMessageItem( user_id=add_req.user_id, + task_id=add_req.task_id, session_id=target_session_id, mem_cube_id=self.cube_id, mem_cube=self.naive_mem_cube, @@ -448,6 +449,7 @@ def _schedule_memory_tasks( else: message_item_add = ScheduleMessageItem( user_id=add_req.user_id, + task_id=add_req.task_id, session_id=target_session_id, mem_cube_id=self.cube_id, mem_cube=self.naive_mem_cube, @@ -487,6 +489,7 @@ def _process_pref_mem( messages_list = [add_req.messages] message_item_pref = ScheduleMessageItem( user_id=add_req.user_id, + task_id=add_req.task_id, session_id=target_session_id, mem_cube_id=self.cube_id, mem_cube=self.naive_mem_cube,