diff --git a/examples/mem_reader/multimodel_struct_reader.py b/examples/mem_reader/multimodal_struct_reader.py similarity index 98% rename from examples/mem_reader/multimodel_struct_reader.py rename to examples/mem_reader/multimodal_struct_reader.py index 129662823..d132a4170 100644 --- a/examples/mem_reader/multimodel_struct_reader.py +++ b/examples/mem_reader/multimodal_struct_reader.py @@ -7,8 +7,8 @@ from dotenv import load_dotenv -from memos.configs.mem_reader import MultiModelStructMemReaderConfig -from memos.mem_reader.multi_model_struct import MultiModelStructMemReader +from memos.configs.mem_reader import MultiModalStructMemReaderConfig +from memos.mem_reader.multi_modal_struct import MultiModalStructMemReader from memos.memories.textual.item import ( SourceMessage, TextualMemoryItem, @@ -111,11 +111,11 @@ def get_reader_config() -> dict[str, Any]: """ Get reader configuration from environment variables. - Returns a dictionary that can be used to create MultiModelStructMemReaderConfig. + Returns a dictionary that can be used to create MultiModalStructMemReaderConfig. Similar to APIConfig.get_reader_config() in server_router_api.py. Returns: - Configuration dictionary for MultiModelStructMemReaderConfig + Configuration dictionary for MultiModalStructMemReaderConfig """ openai_api_key = os.getenv("OPENAI_API_KEY") openai_base_url = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1") @@ -228,13 +228,13 @@ def main(): if openai_api_key: # Use environment variables (similar to server_router_api.py) config_dict = get_reader_config() - reader_config = MultiModelStructMemReaderConfig.model_validate(config_dict) + reader_config = MultiModalStructMemReaderConfig.model_validate(config_dict) else: # Fall back to JSON file - reader_config = MultiModelStructMemReaderConfig.from_json_file( + reader_config = MultiModalStructMemReaderConfig.from_json_file( "examples/data/config/simple_struct_reader_config.json" ) - reader = MultiModelStructMemReader(reader_config) + reader = MultiModalStructMemReader(reader_config) # 2. Define scene data scene_data = [ diff --git a/examples/mem_reader/parser/__init__.py b/examples/mem_reader/parser/__init__.py new file mode 100644 index 000000000..3a947ae89 --- /dev/null +++ b/examples/mem_reader/parser/__init__.py @@ -0,0 +1 @@ +"""Parser examples for different message types.""" diff --git a/examples/mem_reader/parser/config_utils.py b/examples/mem_reader/parser/config_utils.py new file mode 100644 index 000000000..225b8b5b4 --- /dev/null +++ b/examples/mem_reader/parser/config_utils.py @@ -0,0 +1,132 @@ +"""Shared configuration utilities for parser examples. + +This module provides configuration functions that match the configuration +logic in examples/mem_reader/multimodal_struct_reader.py. +""" + +import os + +from typing import Any + +from memos.configs.embedder import EmbedderConfigFactory +from memos.configs.llm import LLMConfigFactory +from memos.embedders.factory import EmbedderFactory +from memos.llms.factory import LLMFactory + + +def get_reader_config() -> dict[str, Any]: + """ + Get reader configuration from environment variables. + + Returns a dictionary that can be used to create MultiModalStructMemReaderConfig. + Matches the configuration logic in examples/mem_reader/multimodal_struct_reader.py. + + Returns: + Configuration dictionary with llm, embedder, and chunker configs + """ + openai_api_key = os.getenv("OPENAI_API_KEY") + openai_base_url = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1") + ollama_api_base = os.getenv("OLLAMA_API_BASE", "http://localhost:11434") + + # Get LLM backend and config + llm_backend = os.getenv("MEM_READER_LLM_BACKEND", "openai") + if llm_backend == "ollama": + llm_config = { + "backend": "ollama", + "config": { + "model_name_or_path": os.getenv("MEM_READER_LLM_MODEL", "qwen3:0.6b"), + "api_base": ollama_api_base, + "temperature": float(os.getenv("MEM_READER_LLM_TEMPERATURE", "0.0")), + "remove_think_prefix": os.getenv( + "MEM_READER_LLM_REMOVE_THINK_PREFIX", "true" + ).lower() + == "true", + "max_tokens": int(os.getenv("MEM_READER_LLM_MAX_TOKENS", "8192")), + }, + } + else: # openai + llm_config = { + "backend": "openai", + "config": { + "model_name_or_path": os.getenv("MEM_READER_LLM_MODEL", "gpt-4o-mini"), + "api_key": openai_api_key or os.getenv("MEMRADER_API_KEY", "EMPTY"), + "api_base": openai_base_url, + "temperature": float(os.getenv("MEM_READER_LLM_TEMPERATURE", "0.5")), + "remove_think_prefix": os.getenv( + "MEM_READER_LLM_REMOVE_THINK_PREFIX", "true" + ).lower() + == "true", + "max_tokens": int(os.getenv("MEM_READER_LLM_MAX_TOKENS", "8192")), + }, + } + + # Get embedder backend and config + embedder_backend = os.getenv( + "MEM_READER_EMBEDDER_BACKEND", os.getenv("MOS_EMBEDDER_BACKEND", "ollama") + ) + if embedder_backend == "universal_api": + embedder_config = { + "backend": "universal_api", + "config": { + "provider": os.getenv( + "MEM_READER_EMBEDDER_PROVIDER", os.getenv("MOS_EMBEDDER_PROVIDER", "openai") + ), + "api_key": os.getenv( + "MEM_READER_EMBEDDER_API_KEY", + os.getenv("MOS_EMBEDDER_API_KEY", openai_api_key or "sk-xxxx"), + ), + "model_name_or_path": os.getenv( + "MEM_READER_EMBEDDER_MODEL", + os.getenv("MOS_EMBEDDER_MODEL", "text-embedding-3-large"), + ), + "base_url": os.getenv( + "MEM_READER_EMBEDDER_API_BASE", + os.getenv("MOS_EMBEDDER_API_BASE", openai_base_url), + ), + }, + } + else: # ollama + embedder_config = { + "backend": "ollama", + "config": { + "model_name_or_path": os.getenv( + "MEM_READER_EMBEDDER_MODEL", + os.getenv("MOS_EMBEDDER_MODEL", "nomic-embed-text:latest"), + ), + "api_base": ollama_api_base, + }, + } + + return { + "llm": llm_config, + "embedder": embedder_config, + "chunker": { + "backend": "sentence", + "config": { + "tokenizer_or_token_counter": "gpt2", + "chunk_size": 512, + "chunk_overlap": 128, + "min_sentences_per_chunk": 1, + }, + }, + } + + +def init_embedder_and_llm(): + """ + Initialize embedder and LLM from environment variables. + + Returns: + Tuple of (embedder, llm) instances + """ + config_dict = get_reader_config() + + # Initialize embedder + embedder_config = EmbedderConfigFactory.model_validate(config_dict["embedder"]) + embedder = EmbedderFactory.from_config(embedder_config) + + # Initialize LLM + llm_config = LLMConfigFactory.model_validate(config_dict["llm"]) + llm = LLMFactory.from_config(llm_config) + + return embedder, llm diff --git a/examples/mem_reader/parser/example_assistant_parser.py b/examples/mem_reader/parser/example_assistant_parser.py new file mode 100644 index 000000000..a77f04a68 --- /dev/null +++ b/examples/mem_reader/parser/example_assistant_parser.py @@ -0,0 +1,94 @@ +"""Example demonstrating AssistantParser usage. + +AssistantParser handles assistant messages in chat conversations. +""" + +import sys + +from pathlib import Path + +from dotenv import load_dotenv + +from memos.mem_reader.read_multi_modal.assistant_parser import AssistantParser + + +# Handle imports for both script and module usage +try: + from .config_utils import init_embedder_and_llm +except ImportError: + # When running as script, add parent directory to path + sys.path.insert(0, str(Path(__file__).parent)) + from config_utils import init_embedder_and_llm + +# Load environment variables +load_dotenv() + + +def main(): + """Demonstrate AssistantParser usage.""" + print("=== AssistantParser Example ===\n") + + # 1. Initialize embedder and LLM (using shared config) + embedder, llm = init_embedder_and_llm() + + # 3. Create AssistantParser + parser = AssistantParser(embedder=embedder, llm=llm) + + # 4. Example assistant messages + assistant_messages = [ + { + "role": "assistant", + "content": "I'm sorry to hear that you're feeling down. Would you like to talk about what's been going on?", + "chat_time": "2025-01-15T10:00:30", + "message_id": "msg_001", + }, + { + "role": "assistant", + "content": "Based on the document you provided, I can see several key points: 1) The project timeline, 2) Budget considerations, and 3) Resource allocation.", + "chat_time": "2025-01-15T10:05:30", + "message_id": "msg_002", + }, + { + "role": "assistant", + "content": "Here's a Python solution for your problem:\n```python\ndef solve_problem():\n return 'solution'\n```", + "chat_time": "2025-01-15T10:10:30", + "message_id": "msg_003", + }, + ] + + print("šŸ“ Processing assistant messages:\n") + for i, message in enumerate(assistant_messages, 1): + print(f"Assistant Message {i}:") + print(f" Content: {message['content'][:60]}...") + + # Create source from assistant message + info = {"user_id": "user1", "session_id": "session1"} + source = parser.create_source(message, info) + + print(" āœ… Created SourceMessage:") + print(f" - Type: {source.type}") + print(f" - Role: {source.role}") + print(f" - Content: {source.content[:60]}...") + print(f" - Chat Time: {source.chat_time}") + print(f" - Message ID: {source.message_id}") + print() + + # Parse in fast mode + memory_items = parser.parse_fast(message, info) + print(f" šŸ“Š Fast mode generated {len(memory_items)} memory item(s)") + if memory_items: + print(f" - Memory: {memory_items[0].memory[:60]}...") + print(f" - Memory Type: {memory_items[0].metadata.memory_type}") + print(f" - Tags: {memory_items[0].metadata.tags}") + print() + + # Rebuild assistant message from source + rebuilt = parser.rebuild_from_source(source) + print(f" šŸ”„ Rebuilt message: role={rebuilt['role']}, content={rebuilt['content'][:40]}...") + print() + + print("āœ… AssistantParser example completed!") + + +if __name__ == "__main__": + main() diff --git a/examples/mem_reader/parser/example_file_content_parser.py b/examples/mem_reader/parser/example_file_content_parser.py new file mode 100644 index 000000000..06071a70c --- /dev/null +++ b/examples/mem_reader/parser/example_file_content_parser.py @@ -0,0 +1,132 @@ +"""Example demonstrating FileContentParser usage. + +FileContentParser handles file content parts in multimodal messages (RawMessageList). +""" + +import sys + +from pathlib import Path + +from dotenv import load_dotenv + +from memos.configs.parser import ParserConfigFactory +from memos.mem_reader.read_multi_modal.file_content_parser import FileContentParser +from memos.parsers.factory import ParserFactory + + +# Handle imports for both script and module usage +try: + from .config_utils import init_embedder_and_llm +except ImportError: + # When running as script, add parent directory to path + sys.path.insert(0, str(Path(__file__).parent)) + from config_utils import init_embedder_and_llm + +# Load environment variables +load_dotenv() + + +def main(): + """Demonstrate FileContentParser usage.""" + print("=== FileContentParser Example ===\n") + + # 1. Initialize embedder and LLM (using shared config) + embedder, llm = init_embedder_and_llm() + + # 3. Initialize parser for file content parsing (optional) + try: + parser_config = ParserConfigFactory.model_validate( + { + "backend": "markitdown", + "config": {}, + } + ) + file_parser = ParserFactory.from_config(parser_config) + except Exception as e: + print(f"āš ļø Warning: Could not initialize file parser: {e}") + print(" FileContentParser will work without a parser, but file parsing will be limited.") + file_parser = None + + # 4. Create FileContentParser + parser = FileContentParser(embedder=embedder, llm=llm, parser=file_parser) + + # 5. Example file content parts + file_content_parts = [ + { + "type": "file", + "file": { + "filename": "document.pdf", + "file_id": "file_123", + "file_data": "This is the content extracted from the PDF file...", + }, + }, + { + "type": "file", + "file": { + "filename": "report.docx", + "file_id": "file_456", + "file_data": "Report content: Analysis of Q4 performance...", + }, + }, + { + "type": "file", + "file": { + "filename": "data.csv", + "file_id": "file_789", + "path": "/path/to/data.csv", # Alternative: using path instead of file_data + }, + }, + ] + + print("šŸ“ Processing file content parts:\n") + for i, part in enumerate(file_content_parts, 1): + print(f"File Content Part {i}:") + file_info = part.get("file", {}) + print(f" Filename: {file_info.get('filename', 'unknown')}") + print(f" File ID: {file_info.get('file_id', 'N/A')}") + + # Create source from file content part + info = {"user_id": "user1", "session_id": "session1"} + source = parser.create_source(part, info) + + print(" āœ… Created SourceMessage:") + print(f" - Type: {source.type}") + print(f" - Doc Path: {source.doc_path}") + if source.content: + print(f" - Content: {source.content[:60]}...") + if hasattr(source, "original_part") and source.original_part: + print(" - Has original_part: Yes") + print() + + # Rebuild file content part from source + rebuilt = parser.rebuild_from_source(source) + print(" šŸ”„ Rebuilt part:") + print(f" - Type: {rebuilt['type']}") + print(f" - Filename: {rebuilt['file'].get('filename', 'N/A')}") + print() + + # 6. Example with actual file path (if parser is available) + if file_parser: + print("šŸ“„ Testing file parsing with actual file path:\n") + # Note: This is just an example - actual file parsing would require a real file + example_file_part = { + "type": "file", + "file": { + "filename": "example.txt", + "path": "examples/mem_reader/text1.txt", # Using existing test file + }, + } + + try: + source = parser.create_source(example_file_part, info) + print(f" āœ… Created SourceMessage for file: {source.doc_path}") + # The parser would parse the file content if the file exists + except Exception as e: + print(f" āš ļø File parsing note: {e}") + print() + + print("āœ… FileContentParser example completed!") + + +if __name__ == "__main__": + main() diff --git a/examples/mem_reader/parser/example_multi_modal_parser.py b/examples/mem_reader/parser/example_multi_modal_parser.py new file mode 100644 index 000000000..3638d8d5e --- /dev/null +++ b/examples/mem_reader/parser/example_multi_modal_parser.py @@ -0,0 +1,400 @@ +"""Example demonstrating MultiModalParser parser selection. + +This example verifies that different input types correctly return +the corresponding parser instances. + +MessagesType Definition (from src/memos/types/general_types.py): + MessagesType = str | MessageList | RawMessageList + + Where: + - str: Simple string messages + - MessageList: list[ChatCompletionMessageParam] + ChatCompletionMessageParam = ( + ChatCompletionSystemMessageParam | + ChatCompletionUserMessageParam | + ChatCompletionAssistantMessageParam | + ChatCompletionToolMessageParam + ) + - RawMessageList: list[RawMessageDict] + RawMessageDict = ChatCompletionContentPartTextParam | File + + Note: User/Assistant messages can have multimodal content (list of parts): + - {"type": "text", "text": "..."} + - {"type": "file", "file": {...}} + - {"type": "image_url", "image_url": {...}} + - {"type": "input_audio", "input_audio": {...}} +""" + +import sys + +from pathlib import Path + +from dotenv import load_dotenv + +from memos.mem_reader.read_multi_modal.multi_modal_parser import MultiModalParser + + +# Add src directory to path for imports +project_root = Path(__file__).parent.parent.parent.parent +src_path = project_root / "src" +if str(src_path) not in sys.path: + sys.path.insert(0, str(src_path)) + + +# Handle imports for both script and module usage +try: + from .config_utils import init_embedder_and_llm +except ImportError: + # When running as script, add parent directory to path + sys.path.insert(0, str(Path(__file__).parent)) + from config_utils import init_embedder_and_llm + +# Load environment variables +load_dotenv() + + +def parser_selection(): + """Test that different input types return the correct parser.""" + print("=== MultiModalParser Parser Selection Test ===\n") + + # 1. Initialize embedder and LLM + embedder, llm = init_embedder_and_llm() + + # 2. Create MultiModalParser + parser = MultiModalParser(embedder=embedder, llm=llm) + + # 3. Test cases: different input types + test_cases = [ + # String input -> StringParser + { + "name": "String input", + "message": "This is a simple string message", + "expected_parser_type": "StringParser", + }, + # RawMessageList: text type -> TextContentParser + { + "name": "Text content part (RawMessageList)", + "message": {"type": "text", "text": "This is a text content part"}, + "expected_parser_type": "TextContentParser", + }, + # RawMessageList: file type -> FileContentParser + { + "name": "File content part (RawMessageList)", + "message": { + "type": "file", + "file": { + "filename": "example.pdf", + "file_data": "File content here", + }, + }, + "expected_parser_type": "FileContentParser", + }, + # RawMessageList: image_url type -> None (type_parsers uses "image" key, not "image_url") + { + "name": "Image content part (RawMessageList - image_url type)", + "message": { + "type": "image_url", + "image_url": { + "url": "https://example.com/image.jpg", + "detail": "auto", + }, + }, + "expected_parser_type": None, # type_parsers has "image" key, but message has "image_url" type + "should_return_none": True, + }, + # RawMessageList: input_audio type -> None (type_parsers uses "audio" key, not "input_audio") + { + "name": "Audio content part (RawMessageList - input_audio type)", + "message": { + "type": "input_audio", + "input_audio": { + "data": "base64_encoded_audio_data", + "format": "mp3", + }, + }, + "expected_parser_type": None, # type_parsers has "audio" key, but message has "input_audio" type + "should_return_none": True, + }, + # MessageList: system role -> SystemParser + { + "name": "System message", + "message": { + "role": "system", + "content": "You are a helpful assistant.", + }, + "expected_parser_type": "SystemParser", + }, + # MessageList: user role -> UserParser + { + "name": "User message (simple)", + "message": { + "role": "user", + "content": "Hello, how are you?", + }, + "expected_parser_type": "UserParser", + }, + # MessageList: user role with multimodal content -> UserParser + { + "name": "User message (multimodal with text and file)", + "message": { + "role": "user", + "content": [ + {"type": "text", "text": "What's in this image?"}, + {"type": "file", "file": {"filename": "image.jpg", "file_data": ""}}, + ], + }, + "expected_parser_type": "UserParser", + }, + # MessageList: user role with image_url content -> UserParser + { + "name": "User message (with image_url)", + "message": { + "role": "user", + "content": [ + {"type": "text", "text": "What's in this image?"}, + { + "type": "image_url", + "image_url": {"url": "https://example.com/image.jpg"}, + }, + ], + }, + "expected_parser_type": "UserParser", + }, + # MessageList: user role with input_audio content -> UserParser + { + "name": "User message (with input_audio)", + "message": { + "role": "user", + "content": [ + {"type": "text", "text": "Listen to this audio"}, + { + "type": "input_audio", + "input_audio": {"data": "base64_data", "format": "wav"}, + }, + ], + }, + "expected_parser_type": "UserParser", + }, + # MessageList: assistant role -> AssistantParser + { + "name": "Assistant message (simple)", + "message": { + "role": "assistant", + "content": "I'm doing well, thank you!", + }, + "expected_parser_type": "AssistantParser", + }, + # MessageList: assistant role with tool_calls -> AssistantParser + { + "name": "Assistant message (with tool_calls)", + "message": { + "role": "assistant", + "content": None, + "tool_calls": [ + { + "id": "call_123", + "type": "function", + "function": { + "name": "get_weather", + "arguments": '{"location": "Beijing"}', + }, + } + ], + }, + "expected_parser_type": "AssistantParser", + }, + # MessageList: tool role -> ToolParser + { + "name": "Tool message", + "message": { + "role": "tool", + "content": "Tool execution result", + "tool_call_id": "call_123", + }, + "expected_parser_type": "ToolParser", + }, + ] + + print("Testing parser selection for different input types:\n") + all_passed = True + + for i, test_case in enumerate(test_cases, 1): + message = test_case["message"] + expected_type = test_case.get("expected_parser_type") + test_name = test_case["name"] + should_return_none = test_case.get("should_return_none", False) + + # Get parser using internal method + selected_parser = parser._get_parser(message) + + # Handle cases where None is expected + if should_return_none or expected_type is None: + if selected_parser is None: + print(f"āœ… Test {i}: {test_name}") + print(" Expected: None (parser not implemented yet or not found)") + print(" Got: None") + if expected_type: + print(f" Note: {expected_type} is not yet implemented") + else: + print(f"āš ļø Test {i}: {test_name}") + print(" Expected: None") + print(f" Got: {type(selected_parser).__name__}") + print(" Note: Parser found but may not be fully implemented") + print() + continue + + # Check if parser was found + if selected_parser is None: + print(f"āŒ Test {i}: {test_name}") + print(f" Expected: {expected_type}") + print(" Got: None (parser not found)") + print(f" Message: {message}\n") + all_passed = False + continue + + # Get actual parser type name + actual_type = type(selected_parser).__name__ + + # Verify parser type + if actual_type == expected_type: + print(f"āœ… Test {i}: {test_name}") + print(f" Expected: {expected_type}") + print(f" Got: {actual_type}") + print(f" Parser instance: {selected_parser}") + else: + print(f"āŒ Test {i}: {test_name}") + print(f" Expected: {expected_type}") + print(f" Got: {actual_type}") + print(f" Message: {message}") + all_passed = False + print() + + # Test edge cases + print("\n=== Testing Edge Cases ===\n") + + edge_cases = [ + { + "name": "Unknown message type (not dict, not str)", + "message": 12345, + "should_return_none": True, + }, + { + "name": "Dict without type or role", + "message": {"content": "Some content"}, + "should_return_none": True, + }, + { + "name": "Unknown type in RawMessageList", + "message": {"type": "unknown_type", "data": "some data"}, + "should_return_none": True, + }, + { + "name": "Unknown role in MessageList", + "message": {"role": "unknown_role", "content": "some content"}, + "should_return_none": True, + }, + { + "name": "List of messages (MessageList - not handled by _get_parser)", + "message": [ + {"role": "user", "content": "Message 1"}, + {"role": "assistant", "content": "Message 2"}, + ], + "should_return_none": True, # Lists are handled in parse(), not _get_parser() + }, + { + "name": "List of RawMessageList items (not handled by _get_parser)", + "message": [ + {"type": "text", "text": "Text content 1"}, + {"type": "file", "file": {"filename": "doc.pdf", "file_data": ""}}, + ], + "should_return_none": True, # Lists are handled in parse(), not _get_parser() + }, + ] + + for i, test_case in enumerate(edge_cases, 1): + message = test_case["message"] + should_return_none = test_case["should_return_none"] + test_name = test_case["name"] + + selected_parser = parser._get_parser(message) + + if should_return_none: + if selected_parser is None: + print(f"āœ… Edge Case {i}: {test_name}") + print(" Correctly returned None") + else: + print(f"āŒ Edge Case {i}: {test_name}") + print(" Expected: None") + print(f" Got: {type(selected_parser).__name__}") + all_passed = False + else: + if selected_parser is not None: + print(f"āœ… Edge Case {i}: {test_name}") + print(f" Got parser: {type(selected_parser).__name__}") + else: + print(f"āŒ Edge Case {i}: {test_name}") + print(" Expected: Parser") + print(" Got: None") + all_passed = False + print() + + # Summary + print("=" * 60) + if all_passed: + print("āœ… All tests passed! Parser selection is working correctly.") + else: + print("āŒ Some tests failed. Please check the output above.") + print("=" * 60) + + +def parser_instances(): + """Test that parser instances are correctly initialized.""" + print("\n=== Parser Instance Verification ===\n") + + embedder, llm = init_embedder_and_llm() + parser = MultiModalParser(embedder=embedder, llm=llm) + + # Verify all parser instances are initialized + parsers_to_check = { + "string_parser": "StringParser", + "system_parser": "SystemParser", + "user_parser": "UserParser", + "assistant_parser": "AssistantParser", + "tool_parser": "ToolParser", + "text_content_parser": "TextContentParser", + "file_content_parser": "FileContentParser", + } + + print("Checking parser instance initialization:\n") + all_initialized = True + + for attr_name, expected_type in parsers_to_check.items(): + parser_instance = getattr(parser, attr_name, None) + if parser_instance is None: + print(f"āŒ {attr_name}: Not initialized") + all_initialized = False + else: + actual_type = type(parser_instance).__name__ + if actual_type == expected_type: + print(f"āœ… {attr_name}: {actual_type}") + else: + print(f"āŒ {attr_name}: Expected {expected_type}, got {actual_type}") + all_initialized = False + + print() + if all_initialized: + print("āœ… All parser instances are correctly initialized!") + else: + print("āŒ Some parser instances are missing or incorrect.") + print() + + +def main(): + """Run all tests.""" + parser_selection() + parser_instances() + print("\nāœ… MultiModalParser example completed!") + + +if __name__ == "__main__": + main() diff --git a/examples/mem_reader/parser/example_string_parser.py b/examples/mem_reader/parser/example_string_parser.py new file mode 100644 index 000000000..3ec658a0e --- /dev/null +++ b/examples/mem_reader/parser/example_string_parser.py @@ -0,0 +1,66 @@ +"""Example demonstrating StringParser usage. + +StringParser handles simple string messages that need to be converted to memory items. +""" + +import sys + +from pathlib import Path + +from dotenv import load_dotenv + +from memos.mem_reader.read_multi_modal.string_parser import StringParser + + +# Handle imports for both script and module usage +try: + from .config_utils import init_embedder_and_llm +except ImportError: + # When running as script, add parent directory to path + sys.path.insert(0, str(Path(__file__).parent)) + from config_utils import init_embedder_and_llm + +# Load environment variables +load_dotenv() + + +def main(): + """Demonstrate StringParser usage.""" + print("=== StringParser Example ===\n") + + # 1. Initialize embedder and LLM (using shared config) + embedder, llm = init_embedder_and_llm() + + # 3. Create StringParser + parser = StringParser(embedder=embedder, llm=llm) + + # 4. Example string messages + string_messages = [ + "This is a simple text message that needs to be parsed.", + "Another string message for processing.", + "StringParser handles plain text strings and converts them to SourceMessage objects.", + ] + + print("šŸ“ Processing string messages:\n") + for i, message in enumerate(string_messages, 1): + print(f"Message {i}: {message[:50]}...") + + # Create source from string + info = {"user_id": "user1", "session_id": "session1"} + source = parser.create_source(message, info) + + print(" āœ… Created SourceMessage:") + print(f" - Type: {source.type}") + print(f" - Content: {source.content[:50]}...") + print() + + # Rebuild string from source + rebuilt = parser.rebuild_from_source(source) + print(f" šŸ”„ Rebuilt string: {rebuilt[:50]}...") + print() + + print("āœ… StringParser example completed!") + + +if __name__ == "__main__": + main() diff --git a/examples/mem_reader/parser/example_system_parser.py b/examples/mem_reader/parser/example_system_parser.py new file mode 100644 index 000000000..bc684a32b --- /dev/null +++ b/examples/mem_reader/parser/example_system_parser.py @@ -0,0 +1,158 @@ +"""Example demonstrating SystemParser usage. + +SystemParser handles system messages in chat conversations. +Note: System messages support multimodal content, but only text parts are allowed +(not file, image_url, or input_audio like user messages). +""" + +import sys + +from pathlib import Path + +from dotenv import load_dotenv + + +try: + from .print_utils import pretty_print_dict +except ImportError: + # Fallback if print_utils is not available + def pretty_print_dict(d): + import json + + print(json.dumps(d, indent=2, ensure_ascii=False)) + + +from memos.mem_reader.read_multi_modal.system_parser import SystemParser + + +# Handle imports for both script and module usage +try: + from .config_utils import init_embedder_and_llm +except ImportError: + # When running as script, add parent directory to path + sys.path.insert(0, str(Path(__file__).parent)) + from config_utils import init_embedder_and_llm + +# Load environment variables +load_dotenv() + + +def main(): + """Demonstrate SystemParser usage.""" + print("=== SystemParser Example ===\n") + + # 1. Initialize embedder and LLM (using shared config) + embedder, llm = init_embedder_and_llm() + + # 3. Create SystemParser + parser = SystemParser(embedder=embedder, llm=llm) + + # 4. Example system messages (simple text) + simple_system_message = { + "role": "system", + "content": "You are a helpful assistant that provides clear and concise answers.", + "chat_time": "2025-01-15T10:00:00", + "message_id": "msg_001", + } + + print("šŸ“ Example 1: Simple text system message\n") + pretty_print_dict(simple_system_message) + + info = {"user_id": "user1", "session_id": "session1"} + source = parser.create_source(simple_system_message, info) + + print(" āœ… Created SourceMessage:") + print(f" - Type: {source.type}") + print(f" - Role: {source.role}") + print(f" - Content: {source.content[:60]}...") + print(f" - Chat Time: {source.chat_time}") + print(f" - Message ID: {source.message_id}") + print() + + # Parse in fast mode + memory_items = parser.parse_fast(simple_system_message, info) + print(f" šŸ“Š Fast mode generated {len(memory_items)} memory item(s)") + if memory_items: + print(f" - Memory: {memory_items[0].memory[:60]}...") + print(f" - Memory Type: {memory_items[0].metadata.memory_type}") + print(f" - Tags: {memory_items[0].metadata.tags}") + print() + + # 5. Example multimodal system message (multiple text parts) + # Note: System messages only support text parts, not file/image/audio + multimodal_system_message = { + "role": "system", + "content": [ + {"type": "text", "text": "You are a helpful assistant."}, + {"type": "text", "text": "Always provide clear and concise answers."}, + {"type": "text", "text": "If you don't know something, say so."}, + ], + "chat_time": "2025-01-15T10:05:00", + "message_id": "msg_002", + } + + print("šŸ“ Example 2: Multimodal system message (multiple text parts)\n") + pretty_print_dict(multimodal_system_message) + print(f"Message contains {len(multimodal_system_message['content'])} text parts") + + sources = parser.create_source(multimodal_system_message, info) + if isinstance(sources, list): + print(f" āœ… Created {len(sources)} SourceMessage(s):") + for i, src in enumerate(sources, 1): + print(f" [{i}] Type: {src.type}, Role: {src.role}") + print(f" Content: {src.content[:50]}...") + else: + print(f" āœ… Created SourceMessage: Type={sources.type}") + print() + + # Parse in fast mode + memory_items = parser.parse_fast(multimodal_system_message, info) + print(f" šŸ“Š Fast mode generated {len(memory_items)} memory item(s)") + if memory_items: + print(f" - Memory: {memory_items[0].memory[:60]}...") + print(f" - Memory Type: {memory_items[0].metadata.memory_type}") + print(f" - Tags: {memory_items[0].metadata.tags}") + # Show sources from memory item + if memory_items[0].metadata.sources: + print(f" - Sources: {len(memory_items[0].metadata.sources)} SourceMessage(s)") + print() + + # 6. Example with structured system instructions + structured_system_message = { + "role": "system", + "content": [ + { + "type": "text", + "text": "You are a coding assistant specialized in Python programming.", + }, + {"type": "text", "text": "Always write clean, well-documented code."}, + {"type": "text", "text": "Explain your reasoning when providing solutions."}, + ], + "chat_time": "2025-01-15T10:10:00", + "message_id": "msg_003", + } + + print("šŸ“ Example 3: Structured system instructions (multiple text parts)\n") + pretty_print_dict(structured_system_message) + + sources = parser.create_source(structured_system_message, info) + if isinstance(sources, list): + print(f" āœ… Created {len(sources)} SourceMessage(s):") + for i, src in enumerate(sources, 1): + print(f" [{i}] Type: {src.type}, Role: {src.role}") + print(f" Content: {src.content[:50]}...") + print() + + # Rebuild examples + print("šŸ”„ Rebuilding messages from sources:\n") + if isinstance(sources, list) and sources: + rebuilt = parser.rebuild_from_source(sources[0]) + else: + rebuilt = parser.rebuild_from_source(source) + if rebuilt: + pretty_print_dict(rebuilt) + print("āœ… SystemParser example completed!") + + +if __name__ == "__main__": + main() diff --git a/examples/mem_reader/parser/example_text_content_parser.py b/examples/mem_reader/parser/example_text_content_parser.py new file mode 100644 index 000000000..1eb64d033 --- /dev/null +++ b/examples/mem_reader/parser/example_text_content_parser.py @@ -0,0 +1,72 @@ +"""Example demonstrating TextContentParser usage. + +TextContentParser handles text content parts in multimodal messages (RawMessageList). +""" + +import sys + +from pathlib import Path + +from dotenv import load_dotenv + +from memos.mem_reader.read_multi_modal.text_content_parser import TextContentParser + + +# Handle imports for both script and module usage +try: + from .config_utils import init_embedder_and_llm +except ImportError: + # When running as script, add parent directory to path + sys.path.insert(0, str(Path(__file__).parent)) + from config_utils import init_embedder_and_llm + +# Load environment variables +load_dotenv() + + +def main(): + """Demonstrate TextContentParser usage.""" + print("=== TextContentParser Example ===\n") + + # 1. Initialize embedder and LLM (using shared config) + embedder, llm = init_embedder_and_llm() + + # 3. Create TextContentParser + parser = TextContentParser(embedder=embedder, llm=llm) + + # 4. Example text content parts + text_content_parts = [ + {"type": "text", "text": "This is a simple text content part."}, + {"type": "text", "text": "TextContentParser handles text parts in multimodal messages."}, + { + "type": "text", + "text": "This parser is used when processing RawMessageList items that contain text content.", + }, + ] + + print("šŸ“ Processing text content parts:\n") + for i, part in enumerate(text_content_parts, 1): + print(f"Text Content Part {i}:") + print(f" Text: {part['text'][:60]}...") + + # Create source from text content part + info = {"user_id": "user1", "session_id": "session1"} + source = parser.create_source(part, info) + + print(" āœ… Created SourceMessage:") + print(f" - Type: {source.type}") + print(f" - Content: {source.content[:60]}...") + if hasattr(source, "original_part") and source.original_part: + print(" - Has original_part: Yes") + print() + + # Rebuild text content part from source + rebuilt = parser.rebuild_from_source(source) + print(f" šŸ”„ Rebuilt part: type={rebuilt['type']}, text={rebuilt['text'][:40]}...") + print() + + print("āœ… TextContentParser example completed!") + + +if __name__ == "__main__": + main() diff --git a/examples/mem_reader/parser/example_tool_parser.py b/examples/mem_reader/parser/example_tool_parser.py new file mode 100644 index 000000000..bf3f4e333 --- /dev/null +++ b/examples/mem_reader/parser/example_tool_parser.py @@ -0,0 +1,101 @@ +"""Example demonstrating ToolParser usage. + +ToolParser handles tool/function call messages in chat conversations. +""" + +import sys + +from pathlib import Path + +from dotenv import load_dotenv + +from memos.mem_reader.read_multi_modal.tool_parser import ToolParser + + +# Handle imports for both script and module usage +try: + from .config_utils import init_embedder_and_llm +except ImportError: + # When running as script, add parent directory to path + sys.path.insert(0, str(Path(__file__).parent)) + from config_utils import init_embedder_and_llm + +# Load environment variables +load_dotenv() + + +def main(): + """Demonstrate ToolParser usage.""" + print("=== ToolParser Example ===\n") + + # 1. Initialize embedder and LLM (using shared config) + embedder, llm = init_embedder_and_llm() + + # 3. Create ToolParser + parser = ToolParser(embedder=embedder, llm=llm) + + # 4. Example tool messages + tool_messages = [ + { + "role": "tool", + "content": '{"result": "Weather in New York: 72°F, sunny"}', + "tool_call_id": "call_abc123", + "chat_time": "2025-01-15T10:00:30", + "message_id": "msg_001", + }, + { + "role": "tool", + "content": '{"status": "success", "data": {"items": [1, 2, 3]}}', + "tool_call_id": "call_def456", + "chat_time": "2025-01-15T10:05:30", + "message_id": "msg_002", + }, + { + "role": "tool", + "content": "Database query executed successfully. Retrieved 5 records.", + "tool_call_id": "call_ghi789", + "chat_time": "2025-01-15T10:10:30", + "message_id": "msg_003", + }, + ] + + print("šŸ“ Processing tool messages:\n") + for i, message in enumerate(tool_messages, 1): + print(f"Tool Message {i}:") + print(f" Content: {message['content'][:60]}...") + print(f" Tool Call ID: {message['tool_call_id']}") + + # Create source from tool message + info = {"user_id": "user1", "session_id": "session1"} + source = parser.create_source(message, info) + + print(" āœ… Created SourceMessage:") + print(f" - Type: {source.type}") + print(f" - Role: {source.role}") + print(f" - Content: {source.content[:60]}...") + print(f" - Chat Time: {source.chat_time}") + print(f" - Message ID: {source.message_id}") + print() + + # Parse in fast mode + memory_items = parser.parse_fast(message, info) + print(f" šŸ“Š Fast mode generated {len(memory_items)} memory item(s)") + if memory_items: + print(f" - Memory: {memory_items[0].memory[:60]}...") + print(f" - Memory Type: {memory_items[0].metadata.memory_type}") + print(f" - Tags: {memory_items[0].metadata.tags}") + print() + + # Rebuild tool message from source + rebuilt = parser.rebuild_from_source(source) + print(" šŸ”„ Rebuilt message:") + print(f" - Role: {rebuilt['role']}") + print(f" - Tool Call ID: {rebuilt.get('tool_call_id', 'N/A')}") + print(f" - Content: {rebuilt['content'][:40]}...") + print() + + print("āœ… ToolParser example completed!") + + +if __name__ == "__main__": + main() diff --git a/examples/mem_reader/parser/example_user_parser.py b/examples/mem_reader/parser/example_user_parser.py new file mode 100644 index 000000000..78a75b94f --- /dev/null +++ b/examples/mem_reader/parser/example_user_parser.py @@ -0,0 +1,135 @@ +"""Example demonstrating UserParser usage. + +UserParser handles user messages, including multimodal messages with text, files, images, etc. +""" + +import sys + +from pathlib import Path + +from dotenv import load_dotenv +from print_utils import pretty_print_dict + +from memos.mem_reader.read_multi_modal.user_parser import UserParser + + +# Handle imports for both script and module usage +try: + from .config_utils import init_embedder_and_llm +except ImportError: + # When running as script, add parent directory to path + sys.path.insert(0, str(Path(__file__).parent)) + from config_utils import init_embedder_and_llm + +# Load environment variables +load_dotenv() + + +def main(): + """Demonstrate UserParser usage.""" + print("=== UserParser Example ===\n") + + # 1. Initialize embedder and LLM (using shared config) + embedder, llm = init_embedder_and_llm() + + # 3. Create UserParser + parser = UserParser(embedder=embedder, llm=llm) + + # 4. Example user messages (simple text) + simple_user_message = { + "role": "user", + "content": "I'm feeling a bit down today. Can you help me?", + "chat_time": "2025-01-15T10:00:00", + "message_id": "msg_001", + } + + print("šŸ“ Example 1: Simple text user message\n") + pretty_print_dict(simple_user_message) + + info = {"user_id": "user1", "session_id": "session1"} + # Parse in fast mode + memory_items = parser.parse_fast(simple_user_message, info) + print(f" šŸ“Š Fast mode generated {len(memory_items)} memory item(s)") + if memory_items: + print(f" - Memory: {memory_items[0].memory[:60]}...") + print(f" - Memory Type: {memory_items[0].metadata.memory_type}") + print() + + # 5. Example multimodal user message (text + file) + multimodal_user_message = { + "role": "user", + "content": [ + {"type": "text", "text": "Please analyze this document:"}, + { + "type": "file", + "file": { + "filename": "report.pdf", + "file_id": "file_123", + "file_data": "This is the content of the PDF file...", + }, + }, + ], + "chat_time": "2025-01-15T10:05:00", + "message_id": "msg_002", + } + + print("šŸ“ Example 2: Multimodal user message (text + file)\n") + pretty_print_dict(multimodal_user_message) + print(f"Message contains {len(multimodal_user_message['content'])} parts") + + # Parse in fast mode + memory_items = parser.parse_fast(multimodal_user_message, info) + print(f" šŸ“Š Fast mode generated {len(memory_items)} memory item(s)") + for memory_item in memory_items: + sources = memory_item.metadata.sources + print(f" āœ… Created {len(sources)} SourceMessage(s):") + for i, src in enumerate(sources, 1): + print(f" [{i}] Type: {src.type}, Role: {src.role}") + if src.type == "text": + print(f" Content: {src.content[:50]}...") + elif src.type == "file": + print(f" Doc Path: {src.doc_path}") + print() + + # 6. Example with image_url (future support) + image_user_message = { + "role": "user", + "content": [ + {"type": "text", "text": "What's in this image?"}, + { + "type": "image_url", + "image_url": {"url": "https://example.com/image.jpg"}, + }, + ], + "chat_time": "2025-01-15T10:10:00", + "message_id": "msg_003", + } + print("šŸ“ Example 3: User message with image\n") + print(f"Message contains {len(image_user_message['content'])} parts") + pretty_print_dict(image_user_message) + + # Parse in fast mode + memory_items = parser.parse_fast(image_user_message, info) + print(f" šŸ“Š Fast mode generated {len(memory_items)} memory item(s)") + for memory_item in memory_items: + sources = memory_item.metadata.sources + print(f" āœ… Created {len(sources)} SourceMessage(s):") + for i, src in enumerate(sources, 1): + print(f" [{i}] Type: {src.type}, Role: {src.role}") + if src.type == "text": + print(f" Content: {src.content[:50]}...") + elif src.type == "file": + print(f" Doc Path: {src.doc_path}") + elif src.type == "image": + print(f" Image Path: {src.image_path}") + + # Rebuild examples + print("šŸ”„ Rebuilding messages from sources:\n") + rebuilt_simple = parser.rebuild_from_source(sources[1]) + if rebuilt_simple: + pretty_print_dict(rebuilt_simple) + print("āœ… UserParser example completed!") + + +if __name__ == "__main__": + main() diff --git a/examples/mem_reader/parser/print_utils.py b/examples/mem_reader/parser/print_utils.py new file mode 100644 index 000000000..5eba1fa76 --- /dev/null +++ b/examples/mem_reader/parser/print_utils.py @@ -0,0 +1,11 @@ +import pprint + + +def pretty_print_dict(d: dict): + text = pprint.pformat(d, indent=2, width=120) + border = "═" * (max(len(line) for line in text.split("\n")) + 4) + + print(f"ā•”{border}ā•—") + for line in text.split("\n"): + print(f"ā•‘ {line.ljust(len(border) - 2)} ā•‘") + print(f"ā•š{border}ā•") diff --git a/src/memos/configs/mem_reader.py b/src/memos/configs/mem_reader.py index a653a5e68..34693ea68 100644 --- a/src/memos/configs/mem_reader.py +++ b/src/memos/configs/mem_reader.py @@ -45,8 +45,8 @@ class SimpleStructMemReaderConfig(BaseMemReaderConfig): """SimpleStruct MemReader configuration class.""" -class MultiModelStructMemReaderConfig(BaseMemReaderConfig): - """MultiModelStruct MemReader configuration class.""" +class MultiModalStructMemReaderConfig(BaseMemReaderConfig): + """MultiModalStruct MemReader configuration class.""" class StrategyStructMemReaderConfig(BaseMemReaderConfig): @@ -61,7 +61,7 @@ class MemReaderConfigFactory(BaseConfig): backend_to_class: ClassVar[dict[str, Any]] = { "simple_struct": SimpleStructMemReaderConfig, - "multimodel_struct": MultiModelStructMemReaderConfig, + "multimodal_struct": MultiModalStructMemReaderConfig, "strategy_struct": StrategyStructMemReaderConfig, } diff --git a/src/memos/mem_reader/factory.py b/src/memos/mem_reader/factory.py index 263f29001..ff24e5c77 100644 --- a/src/memos/mem_reader/factory.py +++ b/src/memos/mem_reader/factory.py @@ -2,7 +2,7 @@ from memos.configs.mem_reader import MemReaderConfigFactory from memos.mem_reader.base import BaseMemReader -from memos.mem_reader.multi_model_struct import MultiModelStructMemReader +from memos.mem_reader.multi_modal_struct import MultiModalStructMemReader from memos.mem_reader.simple_struct import SimpleStructMemReader from memos.mem_reader.strategy_struct import StrategyStructMemReader from memos.memos_tools.singleton import singleton_factory @@ -14,7 +14,7 @@ class MemReaderFactory(BaseMemReader): backend_to_class: ClassVar[dict[str, Any]] = { "simple_struct": SimpleStructMemReader, "strategy_struct": StrategyStructMemReader, - "multimodel_struct": MultiModelStructMemReader, + "multimodal_struct": MultiModalStructMemReader, } @classmethod diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py new file mode 100644 index 000000000..56405e12a --- /dev/null +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -0,0 +1,328 @@ +import concurrent.futures +import traceback + +from typing import Any + +from memos import log +from memos.configs.mem_reader import MultiModalStructMemReaderConfig +from memos.context.context import ContextThreadPoolExecutor +from memos.mem_reader.read_multi_modal import MultiModalParser +from memos.mem_reader.simple_struct import SimpleStructMemReader +from memos.memories.textual.item import TextualMemoryItem +from memos.types import MessagesType +from memos.utils import timed + + +logger = log.get_logger(__name__) + + +class MultiModalStructMemReader(SimpleStructMemReader): + """Multimodal implementation of MemReader that inherits from + SimpleStructMemReader.""" + + def __init__(self, config: MultiModalStructMemReaderConfig): + """ + Initialize the MultiModalStructMemReader with configuration. + + Args: + config: Configuration object for the reader + """ + from memos.configs.mem_reader import SimpleStructMemReaderConfig + + config_dict = config.model_dump(exclude_none=True) + simple_config = SimpleStructMemReaderConfig(**config_dict) + super().__init__(simple_config) + + # Initialize MultiModalParser for routing to different parsers + self.multi_modal_parser = MultiModalParser( + embedder=self.embedder, + llm=self.llm, + parser=None, + ) + + def _concat_multi_modal_memories( + self, all_memory_items: list[TextualMemoryItem], max_tokens=None, overlap=200 + ) -> list[TextualMemoryItem]: + """ + Aggregates memory items using sliding window logic similar to + `_iter_chat_windows` in simple_struct: + 1. Groups items into windows based on token count (max_tokens) + 2. Each window has overlap tokens for context continuity + 3. Aggregates items within each window into a single memory item + 4. Determines memory_type based on roles in each window + """ + if not all_memory_items: + return [] + + # If only one item, return as-is (no need to aggregate) + if len(all_memory_items) == 1: + return all_memory_items + + max_tokens = max_tokens or self.chat_window_max_tokens + windows = [] + buf_items = [] + cur_text = "" + + # Extract info from first item (all items should have same user_id, session_id) + first_item = all_memory_items[0] + info = { + "user_id": first_item.metadata.user_id, + "session_id": first_item.metadata.session_id, + **(first_item.metadata.info or {}), + } + + for _idx, item in enumerate(all_memory_items): + item_text = item.memory or "" + # Ensure line ends with newline (same format as simple_struct) + line = item_text if item_text.endswith("\n") else f"{item_text}\n" + + # Check if adding this item would exceed max_tokens (same logic as _iter_chat_windows) + # Note: The `and cur_text` condition ensures that single large messages are not truncated. + # If cur_text is empty (new window), even if line exceeds max_tokens, it won't trigger output. + if self._count_tokens(cur_text + line) > max_tokens and cur_text: + # Yield current window + window = self._build_window_from_items(buf_items, info) + if window: + windows.append(window) + + # Keep overlap: remove items until remaining tokens <= overlap + # (same logic as _iter_chat_windows) + while ( + buf_items + and self._count_tokens("".join([it.memory or "" for it in buf_items])) > overlap + ): + buf_items.pop(0) + # Recalculate cur_text from remaining items + cur_text = "".join([it.memory or "" for it in buf_items]) + + # Add item to current window (always, even if it exceeds max_tokens) + # This ensures single large messages are not truncated, same as simple_struct + buf_items.append(item) + # Recalculate cur_text from all items in buffer (same as _iter_chat_windows) + cur_text = "".join([it.memory or "" for it in buf_items]) + + # Yield final window if any items remain + if buf_items: + window = self._build_window_from_items(buf_items, info) + if window: + windows.append(window) + + return windows + + def _build_window_from_items( + self, items: list[TextualMemoryItem], info: dict[str, Any] + ) -> TextualMemoryItem | None: + """ + Build a single memory item from a window of items (similar to _build_fast_node). + + Args: + items: List of TextualMemoryItem objects in the window + info: Dictionary containing user_id and session_id + + Returns: + Aggregated TextualMemoryItem or None if no valid content + """ + if not items: + return None + + # Collect all memory texts and sources + memory_texts = [] + all_sources = [] + roles = set() + + for item in items: + if item.memory: + memory_texts.append(item.memory) + + # Collect sources and extract roles + item_sources = item.metadata.sources or [] + if not isinstance(item_sources, list): + item_sources = [item_sources] + + for source in item_sources: + # Add source to all_sources + all_sources.append(source) + + # Extract role from source + if hasattr(source, "role") and source.role: + roles.add(source.role) + elif isinstance(source, dict) and source.get("role"): + roles.add(source.get("role")) + + # Determine memory_type based on roles (same logic as simple_struct) + # UserMemory if only user role, else LongTermMemory + memory_type = "UserMemory" if roles == {"user"} else "LongTermMemory" + + # Merge all memory texts (preserve the format from parser) + merged_text = "".join(memory_texts) if memory_texts else "" + + if not merged_text.strip(): + # If no text content, return None + return None + + # Create aggregated memory item (similar to _build_fast_node in simple_struct) + aggregated_item = self._make_memory_item( + value=merged_text, + info=info, + memory_type=memory_type, + tags=["mode:fast"], + sources=all_sources, + ) + + return aggregated_item + + @timed + def _process_multi_modal_data( + self, scene_data_info: MessagesType, info, mode: str = "fine", **kwargs + ) -> list[TextualMemoryItem]: + """ + Process multimodal data using MultiModalParser. + + Args: + scene_data_info: MessagesType input + info: Dictionary containing user_id and session_id + mode: mem-reader mode, fast for quick process while fine for + better understanding via calling llm + **kwargs: Additional parameters (mode, etc.) + """ + # Pop custom_tags from info (same as simple_struct.py) + # must pop here, avoid add to info, only used in sync fine mode + custom_tags = info.pop("custom_tags", None) if isinstance(info, dict) else None + + # Use MultiModalParser to parse the scene data + # If it's a list, parse each item; otherwise parse as single message + if isinstance(scene_data_info, list): + # Parse each message in the list + all_memory_items = [] + for msg in scene_data_info: + items = self.multi_modal_parser.parse(msg, info, mode="fast", **kwargs) + all_memory_items.extend(items) + fast_memory_items = self._concat_multi_modal_memories(all_memory_items) + + else: + # Parse as single message + fast_memory_items = self.multi_modal_parser.parse( + scene_data_info, info, mode="fast", **kwargs + ) + + if mode == "fast": + return fast_memory_items + else: + # TODO: parallel call llm and get fine multimodal items + # Part A: call llm + fine_memory_items = [] + fine_memory_items_string_parser = fast_memory_items + fine_memory_items.extend(fine_memory_items_string_parser) + # Part B: get fine multimodal items + + for fast_item in fast_memory_items: + sources = fast_item.metadata.sources + for source in sources: + items = self.multi_modal_parser.process_transfer( + source, context_items=[fast_item], custom_tags=custom_tags + ) + fine_memory_items.extend(items) + logger.warning("Not Implemented Now!") + return fine_memory_items + + @timed + def _process_transfer_multi_modal_data( + self, + raw_node: TextualMemoryItem, + custom_tags: list[str] | None = None, + ) -> list[TextualMemoryItem]: + """ + Process transfer for multimodal data. + + Each source is processed independently by its corresponding parser, + which knows how to rebuild the original message and parse it in fine mode. + """ + sources = raw_node.metadata.sources or [] + if not sources: + logger.warning("[MultiModalStruct] No sources found in raw_node") + return [] + + # Extract info from raw_node (same as simple_struct.py) + info = { + "user_id": raw_node.metadata.user_id, + "session_id": raw_node.metadata.session_id, + **(raw_node.metadata.info or {}), + } + + fine_memory_items = [] + # Part A: call llm + fine_memory_items_string_parser = [] + fine_memory_items.extend(fine_memory_items_string_parser) + # Part B: get fine multimodal items + for source in sources: + items = self.multi_modal_parser.process_transfer( + source, context_items=[raw_node], info=info, custom_tags=custom_tags + ) + fine_memory_items.extend(items) + return fine_memory_items + + def get_scene_data_info(self, scene_data: list, type: str) -> list[list[Any]]: + """ + Convert normalized MessagesType scenes into scene data info. + For MultiModalStructMemReader, this is a simplified version that returns the scenes as-is. + + Args: + scene_data: List of MessagesType scenes + type: Type of scene_data: ['doc', 'chat'] + + Returns: + List of scene data info + """ + # TODO: split messages + return scene_data + + def _read_memory( + self, messages: list[MessagesType], type: str, info: dict[str, Any], mode: str = "fine" + ) -> list[list[TextualMemoryItem]]: + list_scene_data_info = self.get_scene_data_info(messages, type) + + memory_list = [] + # Process Q&A pairs concurrently with context propagation + with ContextThreadPoolExecutor() as executor: + futures = [ + executor.submit(self._process_multi_modal_data, scene_data_info, info, mode=mode) + for scene_data_info in list_scene_data_info + ] + for future in concurrent.futures.as_completed(futures): + try: + res_memory = future.result() + if res_memory is not None: + memory_list.append(res_memory) + except Exception as e: + logger.error(f"Task failed with exception: {e}") + logger.error(traceback.format_exc()) + return memory_list + + def fine_transfer_simple_mem( + self, + input_memories: list[TextualMemoryItem], + type: str, + custom_tags: list[str] | None = None, + ) -> list[list[TextualMemoryItem]]: + if not input_memories: + return [] + + memory_list = [] + + # Process Q&A pairs concurrently with context propagation + with ContextThreadPoolExecutor() as executor: + futures = [ + executor.submit( + self._process_transfer_multi_modal_data, scene_data_info, custom_tags + ) + for scene_data_info in input_memories + ] + for future in concurrent.futures.as_completed(futures): + try: + res_memory = future.result() + if res_memory is not None: + memory_list.append(res_memory) + except Exception as e: + logger.error(f"Task failed with exception: {e}") + logger.error(traceback.format_exc()) + return memory_list diff --git a/src/memos/mem_reader/multi_model_struct.py b/src/memos/mem_reader/multi_model_struct.py deleted file mode 100644 index 4520058b9..000000000 --- a/src/memos/mem_reader/multi_model_struct.py +++ /dev/null @@ -1,203 +0,0 @@ -import concurrent.futures -import traceback - -from typing import Any - -from memos import log -from memos.configs.mem_reader import MultiModelStructMemReaderConfig -from memos.context.context import ContextThreadPoolExecutor -from memos.mem_reader.read_multi_model import MultiModelParser -from memos.mem_reader.simple_struct import SimpleStructMemReader -from memos.memories.textual.item import TextualMemoryItem -from memos.types import MessagesType -from memos.utils import timed - - -logger = log.get_logger(__name__) - - -class MultiModelStructMemReader(SimpleStructMemReader): - """Multi Model implementation of MemReader that inherits from - SimpleStructMemReader.""" - - def __init__(self, config: MultiModelStructMemReaderConfig): - """ - Initialize the MultiModelStructMemReader with configuration. - - Args: - config: Configuration object for the reader - """ - from memos.configs.mem_reader import SimpleStructMemReaderConfig - - config_dict = config.model_dump(exclude_none=True) - simple_config = SimpleStructMemReaderConfig(**config_dict) - super().__init__(simple_config) - - # Initialize MultiModelParser for routing to different parsers - self.multi_model_parser = MultiModelParser( - embedder=self.embedder, - llm=self.llm, - parser=None, - ) - - def _concat_multi_model_memories( - self, all_memory_items: list[TextualMemoryItem] - ) -> list[TextualMemoryItem]: - # TODO: concat multi_model_memories - return all_memory_items - - @timed - def _process_multi_model_data( - self, scene_data_info: MessagesType, info, mode: str = "fine", **kwargs - ) -> list[TextualMemoryItem]: - """ - Process multi-model data using MultiModelParser. - - Args: - scene_data_info: MessagesType input - info: Dictionary containing user_id and session_id - mode: mem-reader mode, fast for quick process while fine for - better understanding via calling llm - **kwargs: Additional parameters (mode, etc.) - """ - # Pop custom_tags from info (same as simple_struct.py) - # must pop here, avoid add to info, only used in sync fine mode - custom_tags = info.pop("custom_tags", None) if isinstance(info, dict) else None - - # Use MultiModelParser to parse the scene data - # If it's a list, parse each item; otherwise parse as single message - if isinstance(scene_data_info, list): - # Parse each message in the list - all_memory_items = [] - for msg in scene_data_info: - items = self.multi_model_parser.parse(msg, info, mode="fast", **kwargs) - all_memory_items.extend(items) - fast_memory_items = self._concat_multi_model_memories(all_memory_items) - - else: - # Parse as single message - fast_memory_items = self.multi_model_parser.parse( - scene_data_info, info, mode="fast", **kwargs - ) - - if mode == "fast": - return fast_memory_items - else: - # TODO: parallel call llm and get fine multi model items - # Part A: call llm - fine_memory_items = [] - fine_memory_items_string_parser = [] - fine_memory_items.extend(fine_memory_items_string_parser) - # Part B: get fine multi model items - - for fast_item in fast_memory_items: - sources = fast_item.metadata.sources - for source in sources: - items = self.multi_model_parser.process_transfer( - source, context_items=[fast_item], custom_tags=custom_tags - ) - fine_memory_items.extend(items) - logger.warning("Not Implemented Now!") - return fine_memory_items - - @timed - def _process_transfer_multi_model_data( - self, - raw_node: TextualMemoryItem, - custom_tags: list[str] | None = None, - ) -> list[TextualMemoryItem]: - """ - Process transfer for multi-model data. - - Each source is processed independently by its corresponding parser, - which knows how to rebuild the original message and parse it in fine mode. - """ - sources = raw_node.metadata.sources or [] - if not sources: - logger.warning("[MultiModelStruct] No sources found in raw_node") - return [] - - # Extract info from raw_node (same as simple_struct.py) - info = { - "user_id": raw_node.metadata.user_id, - "session_id": raw_node.metadata.session_id, - **(raw_node.metadata.info or {}), - } - - fine_memory_items = [] - # Part A: call llm - fine_memory_items_string_parser = [] - fine_memory_items.extend(fine_memory_items_string_parser) - # Part B: get fine multi model items - for source in sources: - items = self.multi_model_parser.process_transfer( - source, context_items=[raw_node], info=info, custom_tags=custom_tags - ) - fine_memory_items.extend(items) - return fine_memory_items - - def get_scene_data_info(self, scene_data: list, type: str) -> list[list[Any]]: - """ - Convert normalized MessagesType scenes into scene data info. - For MultiModelStructMemReader, this is a simplified version that returns the scenes as-is. - - Args: - scene_data: List of MessagesType scenes - type: Type of scene_data: ['doc', 'chat'] - - Returns: - List of scene data info - """ - # TODO: split messages - return scene_data - - def _read_memory( - self, messages: list[MessagesType], type: str, info: dict[str, Any], mode: str = "fine" - ) -> list[list[TextualMemoryItem]]: - list_scene_data_info = self.get_scene_data_info(messages, type) - - memory_list = [] - # Process Q&A pairs concurrently with context propagation - with ContextThreadPoolExecutor() as executor: - futures = [ - executor.submit(self._process_multi_model_data, scene_data_info, info, mode=mode) - for scene_data_info in list_scene_data_info - ] - for future in concurrent.futures.as_completed(futures): - try: - res_memory = future.result() - if res_memory is not None: - memory_list.append(res_memory) - except Exception as e: - logger.error(f"Task failed with exception: {e}") - logger.error(traceback.format_exc()) - return memory_list - - def fine_transfer_simple_mem( - self, - input_memories: list[TextualMemoryItem], - type: str, - custom_tags: list[str] | None = None, - ) -> list[list[TextualMemoryItem]]: - if not input_memories: - return [] - - memory_list = [] - - # Process Q&A pairs concurrently with context propagation - with ContextThreadPoolExecutor() as executor: - futures = [ - executor.submit( - self._process_transfer_multi_model_data, scene_data_info, custom_tags - ) - for scene_data_info in input_memories - ] - for future in concurrent.futures.as_completed(futures): - try: - res_memory = future.result() - if res_memory is not None: - memory_list.append(res_memory) - except Exception as e: - logger.error(f"Task failed with exception: {e}") - logger.error(traceback.format_exc()) - return memory_list diff --git a/src/memos/mem_reader/read_multi_model/__init__.py b/src/memos/mem_reader/read_multi_modal/__init__.py similarity index 87% rename from src/memos/mem_reader/read_multi_model/__init__.py rename to src/memos/mem_reader/read_multi_modal/__init__.py index 39cd63743..5659b4a6a 100644 --- a/src/memos/mem_reader/read_multi_model/__init__.py +++ b/src/memos/mem_reader/read_multi_modal/__init__.py @@ -1,4 +1,4 @@ -"""Multi-model message parsers for different message types. +"""Multimodal message parsers for different message types. This package provides parsers for different message types in both fast and fine modes: - String messages @@ -16,7 +16,7 @@ from .assistant_parser import AssistantParser from .base import BaseMessageParser from .file_content_parser import FileContentParser -from .multi_model_parser import MultiModelParser +from .multi_modal_parser import MultiModalParser from .string_parser import StringParser from .system_parser import SystemParser from .text_content_parser import TextContentParser @@ -29,7 +29,7 @@ "AssistantParser", "BaseMessageParser", "FileContentParser", - "MultiModelParser", + "MultiModalParser", "StringParser", "SystemParser", "TextContentParser", diff --git a/src/memos/mem_reader/read_multi_modal/assistant_parser.py b/src/memos/mem_reader/read_multi_modal/assistant_parser.py new file mode 100644 index 000000000..8e035bb95 --- /dev/null +++ b/src/memos/mem_reader/read_multi_modal/assistant_parser.py @@ -0,0 +1,279 @@ +"""Parser for assistant messages.""" + +import json + +from typing import Any + +from memos.embedders.base import BaseEmbedder +from memos.llms.base import BaseLLM +from memos.log import get_logger +from memos.memories.textual.item import ( + SourceMessage, + TextualMemoryItem, + TreeNodeTextualMemoryMetadata, +) +from memos.types.openai_chat_completion_types import ChatCompletionAssistantMessageParam + +from .base import BaseMessageParser, _derive_key, _extract_text_from_content + + +logger = get_logger(__name__) + + +class AssistantParser(BaseMessageParser): + """Parser for assistant messages. + + Handles multimodal assistant messages by creating one SourceMessage per content part. + Supports text and refusal content parts. + """ + + def __init__(self, embedder: BaseEmbedder, llm: BaseLLM | None = None): + """ + Initialize AssistantParser. + + Args: + embedder: Embedder for generating embeddings + llm: Optional LLM for fine mode processing + """ + super().__init__(embedder, llm) + + def create_source( + self, + message: ChatCompletionAssistantMessageParam, + info: dict[str, Any], + ) -> SourceMessage | list[SourceMessage]: + """ + Create SourceMessage(s) from assistant message. + + Handles: + - content: str | list of content parts (text/refusal) | None + - refusal: str | None (top-level refusal message) + - tool_calls: list of tool calls (when content is None) + - audio: Audio | None (audio response data) + + For multimodal messages (content is a list), creates one SourceMessage per part. + For simple messages (content is str), creates a single SourceMessage. + """ + if not isinstance(message, dict): + return [] + + role = message.get("role", "assistant") + raw_content = message.get("content") + refusal = message.get("refusal") + tool_calls = message.get("tool_calls") + audio = message.get("audio") + chat_time = message.get("chat_time") + message_id = message.get("message_id") + + sources = [] + + if isinstance(raw_content, list): + # Multimodal: create one SourceMessage per part + # Note: Assistant messages only support "text" and "refusal" part types + for part in raw_content: + if isinstance(part, dict): + part_type = part.get("type", "") + if part_type == "text": + sources.append( + SourceMessage( + type="chat", + role=role, + chat_time=chat_time, + message_id=message_id, + content=part.get("text", ""), + ) + ) + elif part_type == "refusal": + sources.append( + SourceMessage( + type="refusal", + role=role, + chat_time=chat_time, + message_id=message_id, + content=part.get("refusal", ""), + ) + ) + else: + # Unknown part type - log warning but still create SourceMessage + logger.warning( + f"[AssistantParser] Unknown part type `{part_type}`. " + f"Expected `text` or `refusal`. Creating SourceMessage with placeholder content." + ) + sources.append( + SourceMessage( + type="chat", + role=role, + chat_time=chat_time, + message_id=message_id, + content=f"[{part_type}]", + ) + ) + elif raw_content is not None: + # Simple message: single SourceMessage + content = _extract_text_from_content(raw_content) + if content: + sources.append( + SourceMessage( + type="chat", + role=role, + chat_time=chat_time, + message_id=message_id, + content=content, + ) + ) + + # Handle top-level refusal field + if refusal: + sources.append( + SourceMessage( + type="refusal", + role=role, + chat_time=chat_time, + message_id=message_id, + content=refusal, + ) + ) + + # Handle tool_calls (when content is None or empty) + if tool_calls: + tool_calls_str = ( + json.dumps(tool_calls, ensure_ascii=False) + if isinstance(tool_calls, list | dict) + else str(tool_calls) + ) + sources.append( + SourceMessage( + type="tool_calls", + role=role, + chat_time=chat_time, + message_id=message_id, + content=f"[tool_calls]: {tool_calls_str}", + ) + ) + + # Handle audio (optional) + if audio: + audio_id = audio.get("id", "") if isinstance(audio, dict) else str(audio) + sources.append( + SourceMessage( + type="audio", + role=role, + chat_time=chat_time, + message_id=message_id, + content=f"[audio]: {audio_id}", + ) + ) + + return ( + sources + if len(sources) > 1 + else (sources[0] if sources else SourceMessage(type="chat", role=role)) + ) + + def rebuild_from_source( + self, + source: SourceMessage, + ) -> ChatCompletionAssistantMessageParam: + """We only need rebuild from specific multimodal source""" + + def parse_fast( + self, + message: ChatCompletionAssistantMessageParam, + info: dict[str, Any], + **kwargs, + ) -> list[TextualMemoryItem]: + if not isinstance(message, dict): + logger.warning(f"[AssistantParser] Expected dict, got {type(message)}") + return [] + + role = message.get("role", "") + raw_content = message.get("content") + refusal = message.get("refusal") + tool_calls = message.get("tool_calls") + audio = message.get("audio") + chat_time = message.get("chat_time", None) + + if role != "assistant": + logger.warning(f"[AssistantParser] Expected role is `assistant`, got {role}") + return [] + + # Build content string from various sources + content_parts = [] + + # Extract content (can be str, list, or None) + if raw_content is not None: + extracted_content = _extract_text_from_content(raw_content) + if extracted_content: + content_parts.append(extracted_content) + + # Add top-level refusal if present + if refusal: + content_parts.append(f"[refusal]: {refusal}") + + # Add tool_calls if present (when content is None or empty) + if tool_calls: + tool_calls_str = ( + json.dumps(tool_calls, ensure_ascii=False) + if isinstance(tool_calls, list | dict) + else str(tool_calls) + ) + content_parts.append(f"[tool_calls]: {tool_calls_str}") + + # Add audio if present + if audio: + audio_id = audio.get("id", "") if isinstance(audio, dict) else str(audio) + content_parts.append(f"[audio]: {audio_id}") + + # Combine all content parts + content = " ".join(content_parts) if content_parts else "" + + parts = [f"{role}: "] + if chat_time: + parts.append(f"[{chat_time}]: ") + prefix = "".join(parts) + line = f"{prefix}{content}\n" + if not line.strip(): + return [] + memory_type = "LongTermMemory" + + # Create source(s) using parser's create_source method + sources = self.create_source(message, info) + if isinstance(sources, SourceMessage): + sources = [sources] + elif not sources: + return [] + + # Extract info fields + info_ = info.copy() + user_id = info_.pop("user_id", "") + session_id = info_.pop("session_id", "") + + # Create memory item (equivalent to _make_memory_item) + memory_item = TextualMemoryItem( + memory=line, + metadata=TreeNodeTextualMemoryMetadata( + user_id=user_id, + session_id=session_id, + memory_type=memory_type, + status="activated", + tags=["mode:fast"], + key=_derive_key(line), + embedding=self.embedder.embed([line])[0], + usage=[], + sources=sources, + background="", + confidence=0.99, + type="fact", + info=info_, + ), + ) + + return [memory_item] + + def parse_fine( + self, + message: ChatCompletionAssistantMessageParam, + info: dict[str, Any], + **kwargs, + ) -> list[TextualMemoryItem]: + return [] diff --git a/src/memos/mem_reader/read_multi_model/base.py b/src/memos/mem_reader/read_multi_modal/base.py similarity index 100% rename from src/memos/mem_reader/read_multi_model/base.py rename to src/memos/mem_reader/read_multi_modal/base.py diff --git a/src/memos/mem_reader/read_multi_model/file_content_parser.py b/src/memos/mem_reader/read_multi_modal/file_content_parser.py similarity index 100% rename from src/memos/mem_reader/read_multi_model/file_content_parser.py rename to src/memos/mem_reader/read_multi_modal/file_content_parser.py diff --git a/src/memos/mem_reader/read_multi_model/multi_model_parser.py b/src/memos/mem_reader/read_multi_modal/multi_modal_parser.py similarity index 92% rename from src/memos/mem_reader/read_multi_model/multi_model_parser.py rename to src/memos/mem_reader/read_multi_modal/multi_modal_parser.py index cca198110..f1214ef5b 100644 --- a/src/memos/mem_reader/read_multi_model/multi_model_parser.py +++ b/src/memos/mem_reader/read_multi_modal/multi_modal_parser.py @@ -1,4 +1,4 @@ -"""Unified multi-model parser for different message types. +"""Unified multimodal parser for different message types. This module provides a unified interface to parse different message types in both fast and fine modes. @@ -26,7 +26,7 @@ logger = get_logger(__name__) -class MultiModelParser: +class MultiModalParser: """Unified parser for different message types.""" def __init__( @@ -36,7 +36,7 @@ def __init__( parser: Any | None = None, ): """ - Initialize MultiModelParser. + Initialize MultiModalParser. Args: embedder: Embedder for generating embeddings @@ -88,7 +88,7 @@ def _get_parser(self, message: Any) -> BaseMessageParser | None: # Handle dict messages if not isinstance(message, dict): - logger.warning(f"[MultiModelParser] Unknown message type: {type(message)}") + logger.warning(f"[MultiModalParser] Unknown message type: {type(message)}") return None # Check if it's a RawMessageList item (text or file) @@ -105,7 +105,7 @@ def _get_parser(self, message: Any) -> BaseMessageParser | None: if parser: return parser - logger.warning(f"[MultiModelParser] Could not determine parser for message: {message}") + logger.warning(f"[MultiModalParser] Could not determine parser for message: {message}") return None def parse( @@ -134,14 +134,14 @@ def parse( # Get appropriate parser parser = self._get_parser(message) if not parser: - logger.warning(f"[MultiModelParser] No parser found for message: {message}") + logger.warning(f"[MultiModalParser] No parser found for message: {message}") return [] # Parse using the appropriate parser try: return parser.parse(message, info, mode=mode, **kwargs) except Exception as e: - logger.error(f"[MultiModelParser] Error parsing message: {e}") + logger.error(f"[MultiModalParser] Error parsing message: {e}") return [] def parse_batch( @@ -192,7 +192,7 @@ def process_transfer( List of TextualMemoryItem objects from fine mode parsing """ if not self.llm: - logger.warning("[MultiModelParser] LLM not available for process_transfer") + logger.warning("[MultiModalParser] LLM not available for process_transfer") return [] # Extract info from context_items if available @@ -219,14 +219,14 @@ def process_transfer( parser = self.role_parsers.get(source.role) if not parser: - logger.warning(f"[MultiModelParser] Could not determine parser for source: {source}") + logger.warning(f"[MultiModalParser] Could not determine parser for source: {source}") return [] # Rebuild message from source using parser's method try: message = parser.rebuild_from_source(source) except Exception as e: - logger.error(f"[MultiModelParser] Error rebuilding message from source: {e}") + logger.error(f"[MultiModalParser] Error rebuilding message from source: {e}") return [] # Parse in fine mode (pass custom_tags to parse_fine) @@ -238,5 +238,5 @@ def process_transfer( message, info, context_items=context_items, custom_tags=custom_tags, **kwargs ) except Exception as e: - logger.error(f"[MultiModelParser] Error parsing in fine mode: {e}") + logger.error(f"[MultiModalParser] Error parsing in fine mode: {e}") return [] diff --git a/src/memos/mem_reader/read_multi_model/string_parser.py b/src/memos/mem_reader/read_multi_modal/string_parser.py similarity index 100% rename from src/memos/mem_reader/read_multi_model/string_parser.py rename to src/memos/mem_reader/read_multi_modal/string_parser.py diff --git a/src/memos/mem_reader/read_multi_modal/system_parser.py b/src/memos/mem_reader/read_multi_modal/system_parser.py new file mode 100644 index 000000000..d2a6611af --- /dev/null +++ b/src/memos/mem_reader/read_multi_modal/system_parser.py @@ -0,0 +1,162 @@ +"""Parser for system messages.""" + +from typing import Any + +from memos.embedders.base import BaseEmbedder +from memos.llms.base import BaseLLM +from memos.log import get_logger +from memos.memories.textual.item import ( + SourceMessage, + TextualMemoryItem, + TreeNodeTextualMemoryMetadata, +) +from memos.types.openai_chat_completion_types import ChatCompletionSystemMessageParam + +from .base import BaseMessageParser, _derive_key, _extract_text_from_content + + +logger = get_logger(__name__) + + +class SystemParser(BaseMessageParser): + """Parser for system messages.""" + + def __init__(self, embedder: BaseEmbedder, llm: BaseLLM | None = None): + """ + Initialize SystemParser. + + Args: + embedder: Embedder for generating embeddings + llm: Optional LLM for fine mode processing + """ + super().__init__(embedder, llm) + + def create_source( + self, + message: ChatCompletionSystemMessageParam, + info: dict[str, Any], + ) -> SourceMessage | list[SourceMessage]: + """ + Create SourceMessage(s) from system message. + + For multimodal messages (content is a list of text parts), creates one SourceMessage per part. + For simple messages (content is str), creates a single SourceMessage. + """ + if not isinstance(message, dict): + return [] + + role = message.get("role", "system") + raw_content = message.get("content", "") + chat_time = message.get("chat_time") + message_id = message.get("message_id") + + sources = [] + + if isinstance(raw_content, list): + # Multimodal: create one SourceMessage per text part + for part in raw_content: + if isinstance(part, dict): + part_type = part.get("type", "") + if part_type == "text": + sources.append( + SourceMessage( + type="chat", + role=role, + chat_time=chat_time, + message_id=message_id, + content=part.get("text", ""), + ) + ) + else: + # Simple message: single SourceMessage + content = _extract_text_from_content(raw_content) + if content: + sources.append( + SourceMessage( + type="chat", + role=role, + chat_time=chat_time, + message_id=message_id, + content=content, + ) + ) + + return ( + sources + if len(sources) > 1 + else (sources[0] if sources else SourceMessage(type="chat", role=role)) + ) + + def rebuild_from_source( + self, + source: SourceMessage, + ) -> ChatCompletionSystemMessageParam: + """We only need rebuild from specific multimodal source""" + + def parse_fast( + self, + message: ChatCompletionSystemMessageParam, + info: dict[str, Any], + **kwargs, + ) -> list[TextualMemoryItem]: + if not isinstance(message, dict): + logger.warning(f"[SystemParser] Expected dict, got {type(message)}") + return [] + + role = message.get("role", "") + raw_content = message.get("content", "") + chat_time = message.get("chat_time", None) + content = _extract_text_from_content(raw_content) + if role != "system": + logger.warning(f"[SystemParser] Expected role is `system`, got {role}") + return [] + parts = [f"{role}: "] + if chat_time: + parts.append(f"[{chat_time}]: ") + prefix = "".join(parts) + line = f"{prefix}{content}\n" + if not line: + return [] + memory_type = "LongTermMemory" + + # Create source(s) using parser's create_source method + sources = self.create_source(message, info) + if isinstance(sources, SourceMessage): + sources = [sources] + elif not sources: + return [] + + # Extract info fields + info_ = info.copy() + user_id = info_.pop("user_id", "") + session_id = info_.pop("session_id", "") + + # Create memory item (equivalent to _make_memory_item) + memory_item = TextualMemoryItem( + memory=line, + metadata=TreeNodeTextualMemoryMetadata( + user_id=user_id, + session_id=session_id, + memory_type=memory_type, + status="activated", + tags=["mode:fast"], + key=_derive_key(line), + embedding=self.embedder.embed([line])[0], + usage=[], + sources=sources, + background="", + confidence=0.99, + type="fact", + info=info_, + ), + ) + + return [memory_item] + + def parse_fine( + self, + message: ChatCompletionSystemMessageParam, + info: dict[str, Any], + **kwargs, + ) -> list[TextualMemoryItem]: + return [] diff --git a/src/memos/mem_reader/read_multi_model/text_content_parser.py b/src/memos/mem_reader/read_multi_modal/text_content_parser.py similarity index 100% rename from src/memos/mem_reader/read_multi_model/text_content_parser.py rename to src/memos/mem_reader/read_multi_modal/text_content_parser.py diff --git a/src/memos/mem_reader/read_multi_model/tool_parser.py b/src/memos/mem_reader/read_multi_modal/tool_parser.py similarity index 100% rename from src/memos/mem_reader/read_multi_model/tool_parser.py rename to src/memos/mem_reader/read_multi_modal/tool_parser.py diff --git a/src/memos/mem_reader/read_multi_model/user_parser.py b/src/memos/mem_reader/read_multi_modal/user_parser.py similarity index 66% rename from src/memos/mem_reader/read_multi_model/user_parser.py rename to src/memos/mem_reader/read_multi_modal/user_parser.py index 7dc505167..8cf667a4b 100644 --- a/src/memos/mem_reader/read_multi_model/user_parser.py +++ b/src/memos/mem_reader/read_multi_modal/user_parser.py @@ -5,10 +5,14 @@ from memos.embedders.base import BaseEmbedder from memos.llms.base import BaseLLM from memos.log import get_logger -from memos.memories.textual.item import SourceMessage, TextualMemoryItem +from memos.memories.textual.item import ( + SourceMessage, + TextualMemoryItem, + TreeNodeTextualMemoryMetadata, +) from memos.types.openai_chat_completion_types import ChatCompletionUserMessageParam -from .base import BaseMessageParser, _extract_text_from_content +from .base import BaseMessageParser, _derive_key, _extract_text_from_content logger = get_logger(__name__) @@ -117,51 +121,7 @@ def rebuild_from_source( self, source: SourceMessage, ) -> ChatCompletionUserMessageParam: - """ - Rebuild user message from SourceMessage. - - If source has original_part, use it directly. - Otherwise, reconstruct from source fields. - """ - # Priority 1: Use original_part if available - if hasattr(source, "original_part") and source.original_part: - original = source.original_part - # If it's a content part, wrap it in a message - if isinstance(original, dict) and "type" in original: - return { - "role": source.role or "user", - "content": [original], - "chat_time": source.chat_time, - "message_id": source.message_id, - } - # If it's already a full message, return it - if isinstance(original, dict) and "role" in original: - return original - - # Priority 2: Rebuild from source fields - if source.type == "file": - return { - "role": source.role or "user", - "content": [ - { - "type": "file", - "file": { - "filename": source.doc_path or "", - "file_data": source.content or "", - }, - } - ], - "chat_time": source.chat_time, - "message_id": source.message_id, - } - - # Simple text message - return { - "role": source.role or "user", - "content": source.content or "", - "chat_time": source.chat_time, - "message_id": source.message_id, - } + """We only need rebuild from specific multimodal source""" def parse_fast( self, @@ -169,7 +129,60 @@ def parse_fast( info: dict[str, Any], **kwargs, ) -> list[TextualMemoryItem]: - return super().parse_fast(message, info, **kwargs) + if not isinstance(message, dict): + logger.warning(f"[UserParser] Expected dict, got {type(message)}") + return [] + + role = message.get("role", "") + # TODO: if file/url/audio etc in content, how to transfer them into a + # readable string? + content = message.get("content", "") + chat_time = message.get("chat_time", None) + if role != "user": + logger.warning(f"[UserParser] Expected role is `user`, got {role}") + return [] + parts = [f"{role}: "] + if chat_time: + parts.append(f"[{chat_time}]: ") + prefix = "".join(parts) + line = f"{prefix}{content}\n" + if not line: + return [] + memory_type = "UserMemory" + + # Create source(s) using parser's create_source method + sources = self.create_source(message, info) + if isinstance(sources, SourceMessage): + sources = [sources] + elif not sources: + return [] + + # Extract info fields + info_ = info.copy() + user_id = info_.pop("user_id", "") + session_id = info_.pop("session_id", "") + + # Create memory item (equivalent to _make_memory_item) + memory_item = TextualMemoryItem( + memory=line, + metadata=TreeNodeTextualMemoryMetadata( + user_id=user_id, + session_id=session_id, + memory_type=memory_type, + status="activated", + tags=["mode:fast"], + key=_derive_key(line), + embedding=self.embedder.embed([line])[0], + usage=[], + sources=sources, + background="", + confidence=0.99, + type="fact", + info=info_, + ), + ) + + return [memory_item] def parse_fine( self, @@ -177,4 +190,9 @@ def parse_fine( info: dict[str, Any], **kwargs, ) -> list[TextualMemoryItem]: + logger.info( + "ChatCompletionUserMessageParam is inherently a " + "text-only modality. No special multimodal handling" + " is required in fine mode." + ) return [] diff --git a/src/memos/mem_reader/read_multi_model/utils.py b/src/memos/mem_reader/read_multi_modal/utils.py similarity index 100% rename from src/memos/mem_reader/read_multi_model/utils.py rename to src/memos/mem_reader/read_multi_modal/utils.py diff --git a/src/memos/mem_reader/read_multi_model/assistant_parser.py b/src/memos/mem_reader/read_multi_model/assistant_parser.py deleted file mode 100644 index 726a954d3..000000000 --- a/src/memos/mem_reader/read_multi_model/assistant_parser.py +++ /dev/null @@ -1,74 +0,0 @@ -"""Parser for assistant messages.""" - -from typing import Any - -from memos.embedders.base import BaseEmbedder -from memos.llms.base import BaseLLM -from memos.log import get_logger -from memos.memories.textual.item import SourceMessage, TextualMemoryItem -from memos.types.openai_chat_completion_types import ChatCompletionAssistantMessageParam - -from .base import BaseMessageParser, _extract_text_from_content - - -logger = get_logger(__name__) - - -class AssistantParser(BaseMessageParser): - """Parser for assistant messages.""" - - def __init__(self, embedder: BaseEmbedder, llm: BaseLLM | None = None): - """ - Initialize AssistantParser. - - Args: - embedder: Embedder for generating embeddings - llm: Optional LLM for fine mode processing - """ - super().__init__(embedder, llm) - - def create_source( - self, - message: ChatCompletionAssistantMessageParam, - info: dict[str, Any], - ) -> SourceMessage: - """Create SourceMessage from assistant message.""" - if not isinstance(message, dict): - return SourceMessage(type="chat", role="assistant") - - content = _extract_text_from_content(message.get("content", "")) - return SourceMessage( - type="chat", - role="assistant", - chat_time=message.get("chat_time"), - message_id=message.get("message_id"), - content=content, - ) - - def rebuild_from_source( - self, - source: SourceMessage, - ) -> ChatCompletionAssistantMessageParam: - """Rebuild assistant message from SourceMessage.""" - return { - "role": "assistant", - "content": source.content or "", - "chat_time": source.chat_time, - "message_id": source.message_id, - } - - def parse_fast( - self, - message: ChatCompletionAssistantMessageParam, - info: dict[str, Any], - **kwargs, - ) -> list[TextualMemoryItem]: - return super().parse_fast(message, info, **kwargs) - - def parse_fine( - self, - message: ChatCompletionAssistantMessageParam, - info: dict[str, Any], - **kwargs, - ) -> list[TextualMemoryItem]: - return [] diff --git a/src/memos/mem_reader/read_multi_model/system_parser.py b/src/memos/mem_reader/read_multi_model/system_parser.py deleted file mode 100644 index 258b752cc..000000000 --- a/src/memos/mem_reader/read_multi_model/system_parser.py +++ /dev/null @@ -1,74 +0,0 @@ -"""Parser for system messages.""" - -from typing import Any - -from memos.embedders.base import BaseEmbedder -from memos.llms.base import BaseLLM -from memos.log import get_logger -from memos.memories.textual.item import SourceMessage, TextualMemoryItem -from memos.types.openai_chat_completion_types import ChatCompletionSystemMessageParam - -from .base import BaseMessageParser, _extract_text_from_content - - -logger = get_logger(__name__) - - -class SystemParser(BaseMessageParser): - """Parser for system messages.""" - - def __init__(self, embedder: BaseEmbedder, llm: BaseLLM | None = None): - """ - Initialize SystemParser. - - Args: - embedder: Embedder for generating embeddings - llm: Optional LLM for fine mode processing - """ - super().__init__(embedder, llm) - - def create_source( - self, - message: ChatCompletionSystemMessageParam, - info: dict[str, Any], - ) -> SourceMessage: - """Create SourceMessage from system message.""" - if not isinstance(message, dict): - return SourceMessage(type="chat", role="system") - - content = _extract_text_from_content(message.get("content", "")) - return SourceMessage( - type="chat", - role="system", - chat_time=message.get("chat_time"), - message_id=message.get("message_id"), - content=content, - ) - - def rebuild_from_source( - self, - source: SourceMessage, - ) -> ChatCompletionSystemMessageParam: - """Rebuild system message from SourceMessage.""" - return { - "role": "system", - "content": source.content or "", - "chat_time": source.chat_time, - "message_id": source.message_id, - } - - def parse_fast( - self, - message: ChatCompletionSystemMessageParam, - info: dict[str, Any], - **kwargs, - ) -> list[TextualMemoryItem]: - return super().parse_fast(message, info, **kwargs) - - def parse_fine( - self, - message: ChatCompletionSystemMessageParam, - info: dict[str, Any], - **kwargs, - ) -> list[TextualMemoryItem]: - return [] diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 627a5793b..53a7de035 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -16,7 +16,7 @@ from memos.embedders.factory import EmbedderFactory from memos.llms.factory import LLMFactory from memos.mem_reader.base import BaseMemReader -from memos.mem_reader.read_multi_model import coerce_scene_data +from memos.mem_reader.read_multi_modal import coerce_scene_data from memos.memories.textual.item import ( SourceMessage, TextualMemoryItem,