Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/memos/mem_scheduler/general_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
memory_len=1,
memcube_name=self._map_memcube_name(msg.mem_cube_id),
)
event.task_id = msg.task_id
self._submit_web_logs([event])
except Exception:
logger.exception("Failed to record addMessage log for query")
Expand Down Expand Up @@ -233,6 +234,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
memory_len=1,
memcube_name=self._map_memcube_name(msg.mem_cube_id),
)
event.task_id = msg.task_id
self._submit_web_logs([event])
except Exception:
logger.exception("Failed to record addMessage log for answer")
Expand Down Expand Up @@ -798,6 +800,50 @@ def process_message(message: ScheduleMessageItem):
f"Successfully processed and add preferences for user_id={user_id}, mem_cube_id={mem_cube_id}, pref_ids={pref_ids}"
)

# Create and submit log for web display
# Only send logs if RabbitMQ is configured with direct exchange (cloud service scenario)
should_send_log = (
self.rabbitmq_config is not None
and hasattr(self.rabbitmq_config, "exchange_type")
and self.rabbitmq_config.exchange_type == "direct"
)
if pref_ids and should_send_log:
pref_content = []
pref_meta = []
for i, pref_mem_item in enumerate(pref_memories):
if i < len(pref_ids):
pref_content.append(
{
"content": pref_mem_item.memory,
"ref_id": pref_ids[i],
}
)
pref_meta.append(
{
"ref_id": pref_ids[i],
"id": pref_ids[i],
"memory": pref_mem_item.memory,
"memory_type": getattr(
pref_mem_item.metadata, "memory_type", "preference"
),
}
)

event = self.create_event_log(
label="addMemory",
from_memory_type=USER_INPUT_TYPE,
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
memcube_log_content=pref_content,
metadata=pref_meta,
memory_len=len(pref_content),
memcube_name=self._map_memcube_name(mem_cube_id),
)
event.task_id = message.task_id
self._submit_web_logs([event])

except Exception as e:
logger.error(f"Error processing pref_add message: {e}", exc_info=True)

Expand Down
6 changes: 4 additions & 2 deletions src/memos/mem_scheduler/schemas/message_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

class ScheduleMessageItem(BaseModel, DictConversionMixin):
item_id: str = Field(description="uuid", default_factory=lambda: str(uuid4()))
task_id: str | None = Field(default=None, description="Parent task ID, if applicable")
redis_message_id: str = Field(default="", description="the message get from redis stream")
user_id: str = Field(..., description="user id")
mem_cube_id: str = Field(..., description="memcube id")
Expand Down Expand Up @@ -114,13 +115,14 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin):
item_id: str = Field(
description="Unique identifier for the log entry", default_factory=lambda: str(uuid4())
)
task_id: str | None = Field(default=None, description="Identifier for the parent task")
user_id: str = Field(..., description="Identifier for the user associated with the log")
mem_cube_id: str = Field(
..., description="Identifier for the memcube associated with this log entry"
)
label: str = Field(..., description="Label categorizing the type of log")
from_memory_type: str = Field(..., description="Source memory type")
to_memory_type: str = Field(..., description="Destination memory type")
from_memory_type: str | None = Field(None, description="Source memory type")
to_memory_type: str | None = Field(None, description="Destination memory type")
log_content: str = Field(..., description="Detailed content of the log entry")
current_memory_sizes: MemorySizes = Field(
default_factory=lambda: dict(DEFAULT_MEMORY_SIZES),
Expand Down
3 changes: 3 additions & 0 deletions src/memos/multi_mem_cube/single_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ def _schedule_memory_tasks(
try:
message_item_read = ScheduleMessageItem(
user_id=add_req.user_id,
task_id=add_req.task_id,
session_id=target_session_id,
mem_cube_id=self.cube_id,
mem_cube=self.naive_mem_cube,
Expand All @@ -448,6 +449,7 @@ def _schedule_memory_tasks(
else:
message_item_add = ScheduleMessageItem(
user_id=add_req.user_id,
task_id=add_req.task_id,
session_id=target_session_id,
mem_cube_id=self.cube_id,
mem_cube=self.naive_mem_cube,
Expand Down Expand Up @@ -487,6 +489,7 @@ def _process_pref_mem(
messages_list = [add_req.messages]
message_item_pref = ScheduleMessageItem(
user_id=add_req.user_id,
task_id=add_req.task_id,
session_id=target_session_id,
mem_cube_id=self.cube_id,
mem_cube=self.naive_mem_cube,
Expand Down
Loading