Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 107 additions & 15 deletions examples/mem_scheduler/memos_w_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,29 @@

from pathlib import Path
from queue import Queue
from typing import TYPE_CHECKING

from memos.configs.mem_cube import GeneralMemCubeConfig
from memos.configs.mem_os import MOSConfig
from datetime import datetime
import re

from memos.configs.mem_scheduler import AuthConfig
from memos.log import get_logger
from memos.mem_cube.general import GeneralMemCube
from memos.mem_os.main import MOS
from memos.mem_scheduler.schemas.general_schemas import (
QUERY_LABEL,
ANSWER_LABEL,
ADD_LABEL,
MEM_ORGANIZE_LABEL,
MEM_UPDATE_LABEL,
MEM_ARCHIVE_LABEL,
NOT_APPLICABLE_TYPE,
)
from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem
from memos.mem_scheduler.utils.filter_utils import transform_name_to_key
from memos.mem_scheduler.general_scheduler import GeneralScheduler


if TYPE_CHECKING:
from memos.mem_scheduler.schemas.message_schemas import (
ScheduleLogForWebItem,
)


FILE_PATH = Path(__file__).absolute()
BASE_DIR = FILE_PATH.parent.parent.parent
sys.path.insert(0, str(BASE_DIR)) # Enable execution from any working directory
Expand Down Expand Up @@ -70,6 +76,91 @@ def init_task():
return conversations, questions


def _truncate_with_rules(text: str) -> str:
has_cjk = bool(re.search(r"[\u4e00-\u9fff]", text))
limit = 32 if has_cjk else 64
normalized = text.strip().replace("\n", " ")
if len(normalized) <= limit:
return normalized
return normalized[:limit] + "..."


def _format_title(ts: datetime, title_text: str) -> str:
return f"{ts.astimezone().strftime('%H:%M:%S')} {title_text}"


def _cube_display_from(mem_cube_id: str) -> str:
if "public" in (mem_cube_id or "").lower():
return "PublicMemCube"
return "UserMemCube"


_TYPE_SHORT = {
"LongTermMemory": "LTM",
"UserMemory": "User",
"WorkingMemory": "Working",
"ActivationMemory": "Activation",
"ParameterMemory": "Parameter",
"TextMemory": "Text",
"UserInput": "Input",
"NotApplicable": "NA",
}


def _format_entry(item: ScheduleLogForWebItem) -> tuple[str, str]:
cube_display = getattr(item, "memcube_name", None) or _cube_display_from(item.mem_cube_id)
label = item.label
content = item.log_content or ""
memcube_content = getattr(item, "memcube_log_content", None) or []
memory_len = getattr(item, "memory_len", None) or len(memcube_content) or 1

def _first_content() -> str:
if memcube_content:
return memcube_content[0].get("content", "") or content
return content

if label in ("addMessage", QUERY_LABEL, ANSWER_LABEL):
target_cube = cube_display.replace("MemCube", "")
title = _format_title(item.timestamp, f"addMessages to {target_cube} MemCube")
return title, _truncate_with_rules(_first_content())

if label in ("addMemory", ADD_LABEL):
title = _format_title(item.timestamp, f"{cube_display} added {memory_len} memories")
return title, _truncate_with_rules(_first_content())

if label in ("updateMemory", MEM_UPDATE_LABEL):
title = _format_title(item.timestamp, f"{cube_display} updated {memory_len} memories")
return title, _truncate_with_rules(_first_content())

if label in ("archiveMemory", MEM_ARCHIVE_LABEL):
title = _format_title(item.timestamp, f"{cube_display} archived {memory_len} memories")
return title, _truncate_with_rules(_first_content())

if label in ("mergeMemory", MEM_ORGANIZE_LABEL):
title = _format_title(item.timestamp, f"{cube_display} merged {memory_len} memories")
merged = [c for c in memcube_content if c.get("type") == "merged"]
post = [c for c in memcube_content if c.get("type") == "postMerge"]
parts = []
if merged:
parts.append("Merged: " + " | ".join(c.get("content", "") for c in merged))
if post:
parts.append("Result: " + " | ".join(c.get("content", "") for c in post))
detail = " ".join(parts) if parts else _first_content()
return title, _truncate_with_rules(detail)

if label == "scheduleMemory":
title = _format_title(item.timestamp, f"{cube_display} scheduled {memory_len} memories")
if memcube_content:
return title, _truncate_with_rules(memcube_content[0].get("content", ""))
key = transform_name_to_key(content)
from_short = _TYPE_SHORT.get(item.from_memory_type, item.from_memory_type)
to_short = _TYPE_SHORT.get(item.to_memory_type, item.to_memory_type)
return title, _truncate_with_rules(f"[{from_short}→{to_short}] {key}: {content}")

title = _format_title(item.timestamp, f"{cube_display} event")
return title, _truncate_with_rules(_first_content())


def show_web_logs(mem_scheduler: GeneralScheduler):
"""Display all web log entries from the scheduler's log queue.

Expand All @@ -84,24 +175,25 @@ def show_web_logs(mem_scheduler: GeneralScheduler):

# Create a temporary queue to preserve the original queue contents
temp_queue = Queue()
log_count = 0
collected: list[ScheduleLogForWebItem] = []

while not mem_scheduler._web_log_message_queue.empty():
log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get()
collected.append(log_item)
temp_queue.put(log_item)
log_count += 1

# Print log entry details
print(f"\nLog Entry #{log_count}:")
print(f'- "{log_item.label}" log: {log_item}')

for idx, log_item in enumerate(sorted(collected, key=lambda x: x.timestamp, reverse=True), 1):
title, content = _format_entry(log_item)
print(f"\nLog Entry #{idx}:")
print(title)
print(content)
print("-" * 50)

# Restore items back to the original queue
while not temp_queue.empty():
mem_scheduler._web_log_message_queue.put(temp_queue.get())

print(f"\nTotal {log_count} web log entries displayed.")
print(f"\nTotal {len(collected)} web log entries displayed.")
print("=" * 110 + "\n")


Expand Down
1 change: 1 addition & 0 deletions src/memos/api/routers/server_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def add_memories(add_req: APIADDRequest):
# Scheduler API Endpoints
# =============================================================================


@router.get("/scheduler/status", summary="Get scheduler running status")
def scheduler_status(user_name: str | None = None):
"""Get scheduler running status."""
Expand Down
69 changes: 60 additions & 9 deletions src/memos/mem_scheduler/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,20 +566,71 @@ def _submit_web_logs(

def get_web_log_messages(self) -> list[dict]:
"""
Retrieves all web log messages from the queue and returns them as a list of JSON-serializable dictionaries.

Returns:
List[dict]: A list of dictionaries representing ScheduleLogForWebItem objects,
ready for JSON serialization. The list is ordered from oldest to newest.
Retrieve structured log messages from the queue and return JSON-serializable dicts.
"""
messages = []
raw_items: list[ScheduleLogForWebItem] = []
while True:
try:
item = self._web_log_message_queue.get_nowait() # Thread-safe get
messages.append(item.to_dict())
raw_items.append(self._web_log_message_queue.get_nowait())
except Exception:
break
return messages

def _map_label(label: str) -> str:
from memos.mem_scheduler.schemas.general_schemas import (
QUERY_LABEL,
ANSWER_LABEL,
ADD_LABEL,
MEM_UPDATE_LABEL,
MEM_ORGANIZE_LABEL,
MEM_ARCHIVE_LABEL,
)

mapping = {
QUERY_LABEL: "addMessage",
ANSWER_LABEL: "addMessage",
ADD_LABEL: "addMemory",
MEM_UPDATE_LABEL: "updateMemory",
MEM_ORGANIZE_LABEL: "mergeMemory",
MEM_ARCHIVE_LABEL: "archiveMemory",
}
return mapping.get(label, label)

def _normalize_item(item: ScheduleLogForWebItem) -> dict:
data = item.to_dict()
data["label"] = _map_label(data.get("label"))
memcube_content = getattr(item, "memcube_log_content", None) or []
metadata = getattr(item, "metadata", None) or []

memcube_name = getattr(item, "memcube_name", None)
if not memcube_name and hasattr(self, "_map_memcube_name"):
memcube_name = self._map_memcube_name(item.mem_cube_id)
data["memcube_name"] = memcube_name

memory_len = getattr(item, "memory_len", None)
if memory_len is None:
if data["label"] == "mergeMemory":
memory_len = len([c for c in memcube_content if c.get("type") != "postMerge"])
elif memcube_content:
memory_len = len(memcube_content)
else:
memory_len = 1 if item.log_content else 0

data["memcube_log_content"] = memcube_content
data["memory_len"] = memory_len

def _with_memory_time(meta: dict) -> dict:
enriched = dict(meta)
if "memory_time" not in enriched:
enriched["memory_time"] = enriched.get("updated_at") or enriched.get(
"update_at"
)
return enriched

data["metadata"] = [_with_memory_time(m) for m in metadata]
data["log_title"] = ""
return data

return [_normalize_item(it) for it in raw_items]

def _message_consumer(self) -> None:
"""
Expand Down
Loading
Loading