From adc00f4fafbbade69c8053ae4a1c8ed0ac4b771e Mon Sep 17 00:00:00 2001 From: kartik-mem0 Date: Wed, 1 Apr 2026 20:24:24 +0530 Subject: [PATCH 1/3] feat: add Mem0 memory integration with config, implementation, docs, tests, and dependency --- docs/user_guide/en/modules/memory.md | 19 + entity/configs/__init__.py | 2 + entity/configs/node/memory.py | 69 ++++ pyproject.toml | 1 + runtime/node/agent/memory/builtin_stores.py | 14 + runtime/node/agent/memory/mem0_memory.py | 203 +++++++++++ tests/test_mem0_memory.py | 384 ++++++++++++++++++++ yaml_instance/demo_mem0_memory.yaml | 47 +++ 8 files changed, 739 insertions(+) create mode 100644 runtime/node/agent/memory/mem0_memory.py create mode 100644 tests/test_mem0_memory.py create mode 100644 yaml_instance/demo_mem0_memory.yaml diff --git a/docs/user_guide/en/modules/memory.md b/docs/user_guide/en/modules/memory.md index 7f71f677fe..f494cb2fcc 100755 --- a/docs/user_guide/en/modules/memory.md +++ b/docs/user_guide/en/modules/memory.md @@ -32,12 +32,23 @@ memory: model: text-embedding-3-small ``` +### Mem0 Memory Config +```yaml +memory: + - name: agent_memory + type: mem0 + config: + api_key: ${MEM0_API_KEY} + agent_id: my-agent +``` + ## 3. Built-in Store Comparison | Type | Path | Highlights | Best for | | --- | --- | --- | --- | | `simple` | `node/agent/memory/simple_memory.py` | Optional disk persistence (JSON) after runs; FAISS + semantic rerank; read/write capable. | Small conversation history, prototypes. | | `file` | `node/agent/memory/file_memory.py` | Chunks files/dirs into a vector index, read-only, auto rebuilds when files change. | Knowledge bases, doc QA. | | `blackboard` | `node/agent/memory/blackboard_memory.py` | Lightweight append-only log trimmed by time/count; no vector search. | Broadcast boards, pipeline debugging. | +| `mem0` | `node/agent/memory/mem0_memory.py` | Cloud-managed by Mem0; semantic search + graph relationships; no local embeddings or persistence needed. Requires `mem0ai` package. | Production memory, cross-session persistence, multi-agent memory sharing. | All stores register through `register_memory_store()` so summaries show up in UI via `MemoryStoreConfig.field_specs()`. @@ -98,6 +109,14 @@ This schema lets multimodal outputs flow into Memory/Thinking modules without ex - **Retrieval** – Returns the latest `top_k` entries ordered by time. - **Write** – `update()` appends the latest snapshot (input/output blocks, attachments, previews). No embeddings are generated, so retrieval is purely recency-based. +### 5.4 Mem0Memory +- **Config** – Requires `api_key` (from [app.mem0.ai](https://app.mem0.ai)). Optional `user_id`, `agent_id`, `org_id`, `project_id` for scoping. +- **Important**: `user_id` and `agent_id` are mutually exclusive in Mem0 API calls. If both are configured, two separate searches are made and results merged. For writes, `agent_id` takes precedence. Agent-generated content is stored with `role: "assistant"`. +- **Retrieval** – Uses Mem0's server-side semantic search. Supports `top_k` and `similarity_threshold` via `MemoryAttachmentConfig`. +- **Write** – `update()` sends conversation messages to Mem0 via the SDK. Agent outputs use `role: "assistant"`, user inputs use `role: "user"`. +- **Persistence** – Fully cloud-managed. `load()` and `save()` are no-ops. Memories persist across runs and sessions automatically. +- **Dependencies** – Requires `mem0ai` package (`pip install mem0ai`). + ## 6. EmbeddingConfig Notes - Fields: `provider`, `model`, `api_key`, `base_url`, `params`. - `provider=openai` uses the official client; override `base_url` for compatibility layers. diff --git a/entity/configs/__init__.py b/entity/configs/__init__.py index d0ef7044fe..b7d3350c40 100755 --- a/entity/configs/__init__.py +++ b/entity/configs/__init__.py @@ -10,6 +10,7 @@ EmbeddingConfig, FileMemoryConfig, FileSourceConfig, + Mem0MemoryConfig, MemoryAttachmentConfig, MemoryStoreConfig, SimpleMemoryConfig, @@ -43,6 +44,7 @@ "FunctionToolConfig", "GraphDefinition", "HumanConfig", + "Mem0MemoryConfig", "MemoryAttachmentConfig", "MemoryStoreConfig", "McpLocalConfig", diff --git a/entity/configs/node/memory.py b/entity/configs/node/memory.py index 3d1d8a091f..970183e0c2 100755 --- a/entity/configs/node/memory.py +++ b/entity/configs/node/memory.py @@ -279,6 +279,75 @@ def from_dict(cls, data: Mapping[str, Any], *, path: str) -> "BlackboardMemoryCo } +@dataclass +class Mem0MemoryConfig(BaseConfig): + """Configuration for Mem0 managed memory service.""" + + api_key: str = "" + org_id: str | None = None + project_id: str | None = None + user_id: str | None = None + agent_id: str | None = None + + @classmethod + def from_dict(cls, data: Mapping[str, Any], *, path: str) -> "Mem0MemoryConfig": + mapping = require_mapping(data, path) + api_key = require_str(mapping, "api_key", path) + org_id = optional_str(mapping, "org_id", path) + project_id = optional_str(mapping, "project_id", path) + user_id = optional_str(mapping, "user_id", path) + agent_id = optional_str(mapping, "agent_id", path) + return cls( + api_key=api_key, + org_id=org_id, + project_id=project_id, + user_id=user_id, + agent_id=agent_id, + path=path, + ) + + FIELD_SPECS = { + "api_key": ConfigFieldSpec( + name="api_key", + display_name="Mem0 API Key", + type_hint="str", + required=True, + description="Mem0 API key (get one from app.mem0.ai)", + default="${MEM0_API_KEY}", + ), + "org_id": ConfigFieldSpec( + name="org_id", + display_name="Organization ID", + type_hint="str", + required=False, + description="Mem0 organization ID for scoping", + advance=True, + ), + "project_id": ConfigFieldSpec( + name="project_id", + display_name="Project ID", + type_hint="str", + required=False, + description="Mem0 project ID for scoping", + advance=True, + ), + "user_id": ConfigFieldSpec( + name="user_id", + display_name="User ID", + type_hint="str", + required=False, + description="User ID for user-scoped memories. Mutually exclusive with agent_id in API calls.", + ), + "agent_id": ConfigFieldSpec( + name="agent_id", + display_name="Agent ID", + type_hint="str", + required=False, + description="Agent ID for agent-scoped memories. Mutually exclusive with user_id in API calls.", + ), + } + + @dataclass class MemoryStoreConfig(BaseConfig): name: str diff --git a/pyproject.toml b/pyproject.toml index e1f46ca00e..22c5b4bc04 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ "filelock>=3.20.1", "markdown>=3.10", "xhtml2pdf>=0.2.17", + "mem0ai>=1.0.9", ] [build-system] diff --git a/runtime/node/agent/memory/builtin_stores.py b/runtime/node/agent/memory/builtin_stores.py index 9b1986c942..bd10a62f79 100755 --- a/runtime/node/agent/memory/builtin_stores.py +++ b/runtime/node/agent/memory/builtin_stores.py @@ -3,6 +3,7 @@ from entity.configs.node.memory import ( BlackboardMemoryConfig, FileMemoryConfig, + Mem0MemoryConfig, SimpleMemoryConfig, MemoryStoreConfig, ) @@ -34,6 +35,19 @@ ) +def _create_mem0_memory(store): + from runtime.node.agent.memory.mem0_memory import Mem0Memory + return Mem0Memory(store) + + +register_memory_store( + "mem0", + config_cls=Mem0MemoryConfig, + factory=_create_mem0_memory, + summary="Mem0 managed memory with semantic search and graph relationships", +) + + class MemoryFactory: @staticmethod def create_memory(store: MemoryStoreConfig) -> MemoryBase: diff --git a/runtime/node/agent/memory/mem0_memory.py b/runtime/node/agent/memory/mem0_memory.py new file mode 100644 index 0000000000..0c6b809ac5 --- /dev/null +++ b/runtime/node/agent/memory/mem0_memory.py @@ -0,0 +1,203 @@ +"""Mem0 managed memory store implementation.""" + +import logging +import time +import uuid +from typing import Any, Dict, List + +from entity.configs import MemoryStoreConfig +from entity.configs.node.memory import Mem0MemoryConfig +from runtime.node.agent.memory.memory_base import ( + MemoryBase, + MemoryContentSnapshot, + MemoryItem, + MemoryWritePayload, +) + +logger = logging.getLogger(__name__) + + +def _get_mem0_client(config: Mem0MemoryConfig): + """Lazy-import mem0ai and create a MemoryClient.""" + try: + from mem0 import MemoryClient + except ImportError: + raise ImportError( + "mem0ai is required for Mem0Memory. Install it with: pip install mem0ai" + ) + + client_kwargs: Dict[str, Any] = {} + if config.api_key: + client_kwargs["api_key"] = config.api_key + if config.org_id: + client_kwargs["org_id"] = config.org_id + if config.project_id: + client_kwargs["project_id"] = config.project_id + + return MemoryClient(**client_kwargs) + + +class Mem0Memory(MemoryBase): + """Memory store backed by Mem0's managed cloud service. + + Mem0 handles embeddings, storage, and semantic search server-side. + No local persistence or embedding computation is needed. + + Important API constraints: + - Agent memories use role="assistant" + agent_id + - user_id and agent_id are stored as separate records in Mem0; + if both are configured, an OR filter is used to search across both scopes. + - search() uses filters dict; add() uses top-level kwargs. + - SDK returns {"memories": [...]} from search. + """ + + def __init__(self, store: MemoryStoreConfig): + config = store.as_config(Mem0MemoryConfig) + if not config: + raise ValueError("Mem0Memory requires a Mem0 memory store configuration") + super().__init__(store) + self.config = config + self.client = _get_mem0_client(config) + self.user_id = config.user_id + self.agent_id = config.agent_id + + # -------- Persistence (no-ops for cloud-managed store) -------- + + def load(self) -> None: + """No-op: Mem0 manages persistence server-side.""" + pass + + def save(self) -> None: + """No-op: Mem0 manages persistence server-side.""" + pass + + # -------- Retrieval -------- + + def _build_search_filters(self, agent_role: str) -> Dict[str, Any]: + """Build the filters dict for Mem0 search. + + Mem0 search requires a filters dict for entity scoping. + user_id and agent_id are stored as separate records, so + when both are configured we use an OR filter to match either. + """ + if self.user_id and self.agent_id: + return { + "OR": [ + {"user_id": self.user_id}, + {"agent_id": self.agent_id}, + ] + } + elif self.user_id: + return {"user_id": self.user_id} + elif self.agent_id: + return {"agent_id": self.agent_id} + else: + # Fallback: use agent_role as agent_id + return {"agent_id": agent_role} + + def retrieve( + self, + agent_role: str, + query: MemoryContentSnapshot, + top_k: int, + similarity_threshold: float, + ) -> List[MemoryItem]: + """Search Mem0 for relevant memories. + + Uses the filters dict to scope by user_id, agent_id, or both + (via OR filter). The SDK returns {"memories": [...]}. + """ + if not query.text.strip(): + return [] + + try: + filters = self._build_search_filters(agent_role) + search_kwargs: Dict[str, Any] = { + "query": query.text, + "top_k": top_k, + "filters": filters, + } + if similarity_threshold >= 0: + search_kwargs["threshold"] = similarity_threshold + + response = self.client.search(**search_kwargs) + + # SDK returns {"memories": [...]} — extract the list + if isinstance(response, dict): + raw_results = response.get("memories", response.get("results", [])) + else: + raw_results = response + except Exception as e: + logger.error("Mem0 search failed: %s", e) + return [] + + items: List[MemoryItem] = [] + for entry in raw_results: + item = MemoryItem( + id=entry.get("id", f"mem0_{uuid.uuid4().hex}"), + content_summary=entry.get("memory", ""), + metadata={ + "agent_role": agent_role, + "score": entry.get("score"), + "categories": entry.get("categories", []), + "source": "mem0", + }, + timestamp=time.time(), + ) + items.append(item) + + return items + + # -------- Update -------- + + def update(self, payload: MemoryWritePayload) -> None: + """Store a memory in Mem0. + + Uses role="assistant" + agent_id for agent-generated memories, + and role="user" + user_id for user-scoped memories. + """ + snapshot = payload.output_snapshot or payload.input_snapshot + if not snapshot or not snapshot.text.strip(): + return + + messages = self._build_messages(payload) + if not messages: + return + + add_kwargs: Dict[str, Any] = {"messages": messages} + + # Determine scoping: agent_id takes precedence for agent-generated content + if self.agent_id: + add_kwargs["agent_id"] = self.agent_id + elif self.user_id: + add_kwargs["user_id"] = self.user_id + else: + # Default: use agent_role as agent_id + add_kwargs["agent_id"] = payload.agent_role + + try: + self.client.add(**add_kwargs) + except Exception as e: + logger.error("Mem0 add failed: %s", e) + + def _build_messages(self, payload: MemoryWritePayload) -> List[Dict[str, str]]: + """Build Mem0-compatible message list from write payload. + + Agent-generated content uses role="assistant". + User input uses role="user". + """ + messages: List[Dict[str, str]] = [] + + if payload.inputs_text and payload.inputs_text.strip(): + messages.append({ + "role": "user", + "content": payload.inputs_text.strip(), + }) + + if payload.output_snapshot and payload.output_snapshot.text.strip(): + messages.append({ + "role": "assistant", + "content": payload.output_snapshot.text.strip(), + }) + + return messages diff --git a/tests/test_mem0_memory.py b/tests/test_mem0_memory.py new file mode 100644 index 0000000000..09617619f3 --- /dev/null +++ b/tests/test_mem0_memory.py @@ -0,0 +1,384 @@ +"""Tests for Mem0 memory store implementation.""" + +from unittest.mock import MagicMock, patch +import pytest + +from entity.configs.node.memory import Mem0MemoryConfig +from runtime.node.agent.memory.memory_base import ( + MemoryContentSnapshot, + MemoryItem, + MemoryWritePayload, +) + + +def _make_store(user_id=None, agent_id=None, api_key="test-key"): + """Build a minimal MemoryStoreConfig mock for Mem0Memory.""" + mem0_cfg = MagicMock(spec=Mem0MemoryConfig) + mem0_cfg.api_key = api_key + mem0_cfg.org_id = None + mem0_cfg.project_id = None + mem0_cfg.user_id = user_id + mem0_cfg.agent_id = agent_id + + store = MagicMock() + store.name = "test_mem0" + + # Return correct config type based on the requested class + def _as_config_side_effect(expected_type, **kwargs): + if expected_type is Mem0MemoryConfig: + return mem0_cfg + return None + + store.as_config.side_effect = _as_config_side_effect + return store + + +def _make_mem0_memory(user_id=None, agent_id=None): + """Create a Mem0Memory with a mocked client.""" + with patch("runtime.node.agent.memory.mem0_memory._get_mem0_client") as mock_get: + mock_client = MagicMock() + mock_get.return_value = mock_client + from runtime.node.agent.memory.mem0_memory import Mem0Memory + store = _make_store(user_id=user_id, agent_id=agent_id) + memory = Mem0Memory(store) + return memory, mock_client + + +class TestMem0MemoryRetrieve: + + def test_retrieve_with_agent_id(self): + """Retrieve passes agent_id in filters dict to SDK search.""" + memory, client = _make_mem0_memory(agent_id="agent-1") + client.search.return_value = { + "memories": [ + {"id": "m1", "memory": "test fact", "score": 0.95}, + ] + } + + query = MemoryContentSnapshot(text="what do you know?") + results = memory.retrieve("writer", query, top_k=5, similarity_threshold=-1.0) + + client.search.assert_called_once() + call_kwargs = client.search.call_args[1] + assert call_kwargs["filters"] == {"agent_id": "agent-1"} + assert len(results) == 1 + assert results[0].content_summary == "test fact" + assert results[0].metadata["source"] == "mem0" + + def test_retrieve_with_user_id(self): + """Retrieve passes user_id in filters dict to SDK search.""" + memory, client = _make_mem0_memory(user_id="user-1") + client.search.return_value = { + "memories": [ + {"id": "m1", "memory": "user pref", "score": 0.9}, + ] + } + + query = MemoryContentSnapshot(text="preferences") + results = memory.retrieve("assistant", query, top_k=3, similarity_threshold=-1.0) + + call_kwargs = client.search.call_args[1] + assert call_kwargs["filters"] == {"user_id": "user-1"} + assert len(results) == 1 + + def test_retrieve_with_both_ids_uses_or_filter(self): + """When both user_id and agent_id are set, an OR filter is used.""" + memory, client = _make_mem0_memory(user_id="user-1", agent_id="agent-1") + client.search.return_value = { + "memories": [ + {"id": "u1", "memory": "user fact", "score": 0.8}, + {"id": "a1", "memory": "agent fact", "score": 0.9}, + ] + } + + query = MemoryContentSnapshot(text="test") + results = memory.retrieve("writer", query, top_k=5, similarity_threshold=-1.0) + + client.search.assert_called_once() + call_kwargs = client.search.call_args[1] + assert call_kwargs["filters"] == { + "OR": [ + {"user_id": "user-1"}, + {"agent_id": "agent-1"}, + ] + } + assert len(results) == 2 + + def test_retrieve_fallback_uses_agent_role(self): + """When no IDs configured, fall back to agent_role as agent_id in filters.""" + memory, client = _make_mem0_memory() + client.search.return_value = {"memories": []} + + query = MemoryContentSnapshot(text="test") + memory.retrieve("coder", query, top_k=3, similarity_threshold=-1.0) + + call_kwargs = client.search.call_args[1] + assert call_kwargs["filters"] == {"agent_id": "coder"} + + def test_retrieve_empty_query_returns_empty(self): + """Empty query text returns empty without calling API.""" + memory, client = _make_mem0_memory(agent_id="a1") + + query = MemoryContentSnapshot(text=" ") + results = memory.retrieve("writer", query, top_k=3, similarity_threshold=-1.0) + + assert results == [] + client.search.assert_not_called() + + def test_retrieve_api_error_returns_empty(self): + """API errors are caught and return empty list.""" + memory, client = _make_mem0_memory(agent_id="a1") + client.search.side_effect = Exception("API down") + + query = MemoryContentSnapshot(text="test") + results = memory.retrieve("writer", query, top_k=3, similarity_threshold=-1.0) + + assert results == [] + + def test_retrieve_respects_top_k(self): + """top_k is passed to Mem0 search.""" + memory, client = _make_mem0_memory(agent_id="a1") + client.search.return_value = {"memories": []} + + query = MemoryContentSnapshot(text="test") + memory.retrieve("writer", query, top_k=7, similarity_threshold=-1.0) + + call_kwargs = client.search.call_args[1] + assert call_kwargs["top_k"] == 7 + + def test_retrieve_passes_threshold_when_non_negative(self): + """Non-negative similarity_threshold is forwarded to Mem0.""" + memory, client = _make_mem0_memory(agent_id="a1") + client.search.return_value = {"memories": []} + + query = MemoryContentSnapshot(text="test") + memory.retrieve("writer", query, top_k=3, similarity_threshold=0.5) + + call_kwargs = client.search.call_args[1] + assert call_kwargs["threshold"] == 0.5 + + def test_retrieve_passes_zero_threshold(self): + """A threshold of 0.0 is a valid value and should be sent.""" + memory, client = _make_mem0_memory(agent_id="a1") + client.search.return_value = {"memories": []} + + query = MemoryContentSnapshot(text="test") + memory.retrieve("writer", query, top_k=3, similarity_threshold=0.0) + + call_kwargs = client.search.call_args[1] + assert call_kwargs["threshold"] == 0.0 + + def test_retrieve_skips_threshold_when_negative(self): + """Negative similarity_threshold is not sent to Mem0.""" + memory, client = _make_mem0_memory(agent_id="a1") + client.search.return_value = {"memories": []} + + query = MemoryContentSnapshot(text="test") + memory.retrieve("writer", query, top_k=3, similarity_threshold=-1.0) + + call_kwargs = client.search.call_args[1] + assert "threshold" not in call_kwargs + + def test_retrieve_handles_legacy_results_key(self): + """Handles SDK response with 'results' key (older SDK versions).""" + memory, client = _make_mem0_memory(agent_id="a1") + client.search.return_value = { + "results": [ + {"id": "m1", "memory": "legacy format", "score": 0.8}, + ] + } + + query = MemoryContentSnapshot(text="test") + results = memory.retrieve("writer", query, top_k=3, similarity_threshold=-1.0) + + assert len(results) == 1 + assert results[0].content_summary == "legacy format" + + +class TestMem0MemoryUpdate: + + def test_update_with_agent_id_uses_assistant_role(self): + """Agent-scoped update sends role=assistant messages with agent_id.""" + memory, client = _make_mem0_memory(agent_id="agent-1") + client.add.return_value = [{"id": "new", "event": "ADD"}] + + payload = MemoryWritePayload( + agent_role="writer", + inputs_text="Write about AI", + input_snapshot=MemoryContentSnapshot(text="Write about AI"), + output_snapshot=MemoryContentSnapshot(text="AI is transformative..."), + ) + memory.update(payload) + + client.add.assert_called_once() + call_kwargs = client.add.call_args[1] + assert call_kwargs["agent_id"] == "agent-1" + assert "user_id" not in call_kwargs + messages = call_kwargs["messages"] + assert messages[0]["role"] == "user" + assert messages[1]["role"] == "assistant" + + def test_update_with_user_id(self): + """User-scoped update uses user_id, not agent_id.""" + memory, client = _make_mem0_memory(user_id="user-1") + client.add.return_value = [] + + payload = MemoryWritePayload( + agent_role="writer", + inputs_text="I prefer Python", + input_snapshot=None, + output_snapshot=MemoryContentSnapshot(text="Noted your preference"), + ) + memory.update(payload) + + call_kwargs = client.add.call_args[1] + assert call_kwargs["user_id"] == "user-1" + assert "agent_id" not in call_kwargs + + def test_update_fallback_uses_agent_role(self): + """When no IDs configured, uses agent_role as agent_id.""" + memory, client = _make_mem0_memory() + client.add.return_value = [] + + payload = MemoryWritePayload( + agent_role="coder", + inputs_text="test input", + input_snapshot=None, + output_snapshot=MemoryContentSnapshot(text="test output"), + ) + memory.update(payload) + + call_kwargs = client.add.call_args[1] + assert call_kwargs["agent_id"] == "coder" + + def test_update_with_both_ids_prefers_agent_id(self): + """When both user_id and agent_id configured, agent_id takes precedence for writes.""" + memory, client = _make_mem0_memory(user_id="user-1", agent_id="agent-1") + client.add.return_value = [] + + payload = MemoryWritePayload( + agent_role="writer", + inputs_text="input", + input_snapshot=None, + output_snapshot=MemoryContentSnapshot(text="output"), + ) + memory.update(payload) + + call_kwargs = client.add.call_args[1] + assert call_kwargs["agent_id"] == "agent-1" + assert "user_id" not in call_kwargs + + def test_update_empty_output_is_noop(self): + """Empty output snapshot skips API call.""" + memory, client = _make_mem0_memory(agent_id="a1") + + payload = MemoryWritePayload( + agent_role="writer", + inputs_text="", + input_snapshot=None, + output_snapshot=MemoryContentSnapshot(text=" "), + ) + memory.update(payload) + + client.add.assert_not_called() + + def test_update_no_snapshot_is_noop(self): + """No snapshot at all skips API call.""" + memory, client = _make_mem0_memory(agent_id="a1") + + payload = MemoryWritePayload( + agent_role="writer", + inputs_text="test", + input_snapshot=None, + output_snapshot=None, + ) + memory.update(payload) + + client.add.assert_not_called() + + def test_update_api_error_does_not_raise(self): + """API errors are logged but do not propagate.""" + memory, client = _make_mem0_memory(agent_id="a1") + client.add.side_effect = Exception("API error") + + payload = MemoryWritePayload( + agent_role="writer", + inputs_text="test", + input_snapshot=None, + output_snapshot=MemoryContentSnapshot(text="output"), + ) + # Should not raise + memory.update(payload) + + +class TestMem0MemoryLoadSave: + + def test_load_is_noop(self): + """load() does nothing for cloud-managed store.""" + memory, _ = _make_mem0_memory(agent_id="a1") + memory.load() # Should not raise + + def test_save_is_noop(self): + """save() does nothing for cloud-managed store.""" + memory, _ = _make_mem0_memory(agent_id="a1") + memory.save() # Should not raise + + +class TestMem0MemoryConfig: + + def test_config_from_dict(self): + """Config parses from dict correctly.""" + data = { + "api_key": "test-key", + "user_id": "u1", + "org_id": "org-1", + } + config = Mem0MemoryConfig.from_dict(data, path="test") + assert config.api_key == "test-key" + assert config.user_id == "u1" + assert config.org_id == "org-1" + assert config.agent_id is None + assert config.project_id is None + + def test_config_field_specs_exist(self): + """FIELD_SPECS are defined for UI generation.""" + specs = Mem0MemoryConfig.field_specs() + assert "api_key" in specs + assert "user_id" in specs + assert "agent_id" in specs + assert specs["api_key"].required is True + + def test_config_requires_api_key(self): + """Config raises ConfigError when api_key is missing.""" + from entity.configs.base import ConfigError + + data = {"agent_id": "a1"} + with pytest.raises(ConfigError): + Mem0MemoryConfig.from_dict(data, path="test") + + +class TestMem0MemoryConstructor: + + def test_raises_on_wrong_config_type(self): + """Mem0Memory raises ValueError when store has wrong config type.""" + from runtime.node.agent.memory.mem0_memory import Mem0Memory + + store = MagicMock() + store.name = "bad_store" + store.as_config.return_value = None # Wrong config type + + with pytest.raises(ValueError, match="Mem0 memory store configuration"): + Mem0Memory(store) + + def test_import_error_when_mem0ai_missing(self): + """Helpful ImportError when mem0ai is not installed.""" + from runtime.node.agent.memory.mem0_memory import _get_mem0_client + + mem0_cfg = MagicMock(spec=Mem0MemoryConfig) + mem0_cfg.api_key = "test" + mem0_cfg.org_id = None + mem0_cfg.project_id = None + + with patch.dict("sys.modules", {"mem0": None}): + with pytest.raises(ImportError, match="pip install mem0ai"): + _get_mem0_client(mem0_cfg) diff --git a/yaml_instance/demo_mem0_memory.yaml b/yaml_instance/demo_mem0_memory.yaml new file mode 100644 index 0000000000..6206e8fa16 --- /dev/null +++ b/yaml_instance/demo_mem0_memory.yaml @@ -0,0 +1,47 @@ +version: 0.4.0 +vars: {} +graph: + id: '' + description: Memory-backed conversation using Mem0 managed memory service. + is_majority_voting: false + nodes: + - id: writer + type: agent + config: + base_url: ${BASE_URL} + api_key: ${API_KEY} + provider: openai + name: gpt-4o + role: | + You are a knowledgeable writer. Use your memories to build on past interactions. + If memory sections are provided (wrapped by ===== Related Memories =====), + incorporate relevant context from those memories into your response. + params: + temperature: 0.7 + max_tokens: 2000 + memories: + - name: mem0_store + top_k: 5 + retrieve_stage: + - gen + read: true + write: true + edges: [] + memory: + # Agent-scoped memory: uses agent_id for storing and retrieving + - name: mem0_store + type: mem0 + config: + api_key: ${MEM0_API_KEY} + agent_id: writer-agent + + # Alternative: User-scoped memory (uncomment to use instead) + # - name: mem0_store + # type: mem0 + # config: + # api_key: ${MEM0_API_KEY} + # user_id: project-user-123 + start: + - writer + end: [] + initial_instruction: '' From 531741545e589a44b036681d7f4e721296c8f808 Mon Sep 17 00:00:00 2001 From: kartik-mem0 Date: Mon, 6 Apr 2026 13:16:46 +0530 Subject: [PATCH 2/3] refactor: update Mem0Memory to use independent user/agent scoping and exclude assistant output --- docs/user_guide/en/modules/memory.md | 4 +- docs/user_guide/zh/modules/memory.md | 19 ++++ runtime/node/agent/memory/mem0_memory.py | 62 ++++++++----- tests/test_mem0_memory.py | 105 +++++++++++++++++++---- yaml_instance/demo_mem0_memory.yaml | 12 +-- 5 files changed, 150 insertions(+), 52 deletions(-) diff --git a/docs/user_guide/en/modules/memory.md b/docs/user_guide/en/modules/memory.md index f494cb2fcc..d0e66ab788 100755 --- a/docs/user_guide/en/modules/memory.md +++ b/docs/user_guide/en/modules/memory.md @@ -111,9 +111,9 @@ This schema lets multimodal outputs flow into Memory/Thinking modules without ex ### 5.4 Mem0Memory - **Config** – Requires `api_key` (from [app.mem0.ai](https://app.mem0.ai)). Optional `user_id`, `agent_id`, `org_id`, `project_id` for scoping. -- **Important**: `user_id` and `agent_id` are mutually exclusive in Mem0 API calls. If both are configured, two separate searches are made and results merged. For writes, `agent_id` takes precedence. Agent-generated content is stored with `role: "assistant"`. +- **Entity scoping**: `user_id` and `agent_id` are independent dimensions — both can be included simultaneously in `add()` and `search()` calls. When both are configured, retrieval uses an OR filter (`{"OR": [{"user_id": ...}, {"agent_id": ...}]}`) to search across both scopes. Writes include both IDs when available. - **Retrieval** – Uses Mem0's server-side semantic search. Supports `top_k` and `similarity_threshold` via `MemoryAttachmentConfig`. -- **Write** – `update()` sends conversation messages to Mem0 via the SDK. Agent outputs use `role: "assistant"`, user inputs use `role: "user"`. +- **Write** – `update()` sends only user input to Mem0 via the SDK (as `role: "user"` messages). Assistant output is excluded to prevent noise memories from the LLM's responses being extracted as facts. - **Persistence** – Fully cloud-managed. `load()` and `save()` are no-ops. Memories persist across runs and sessions automatically. - **Dependencies** – Requires `mem0ai` package (`pip install mem0ai`). diff --git a/docs/user_guide/zh/modules/memory.md b/docs/user_guide/zh/modules/memory.md index ffb1904f03..453e0d4d2e 100755 --- a/docs/user_guide/zh/modules/memory.md +++ b/docs/user_guide/zh/modules/memory.md @@ -32,12 +32,23 @@ memory: model: text-embedding-3-small ``` +### Mem0 Memory 配置 +```yaml +memory: + - name: agent_memory + type: mem0 + config: + api_key: ${MEM0_API_KEY} + agent_id: my-agent +``` + ## 3. 内置 Memory Store 对比 | 类型 | 路径 | 特点 | 适用场景 | | --- | --- | --- | --- | | `simple` | `node/agent/memory/simple_memory.py` | 运行结束后可选择落盘(JSON);使用向量搜索(FAISS)+语义重打分;支持读写 | 小规模对话记忆、快速原型 | | `file` | `node/agent/memory/file_memory.py` | 将指定文件/目录切片为向量索引,只读;自动检测文件变更并更新索引 | 知识库、文档问答 | | `blackboard` | `node/agent/memory/blackboard_memory.py` | 轻量附加日志,按时间/条数裁剪;不依赖向量检索 | 简易广播板、流水线调试 | +| `mem0` | `node/agent/memory/mem0_memory.py` | 由 Mem0 云端托管;支持语义搜索 + 图关系;无需本地 embedding 或持久化。需安装 `mem0ai` 包。 | 生产级记忆、跨会话持久化、多 Agent 记忆共享 | > 所有内置 store 都会在 `register_memory_store()` 中注册,摘要可通过 `MemoryStoreConfig.field_specs()` 在 UI 中展示。 @@ -100,6 +111,14 @@ nodes: - **检索**:直接返回最近 `top_k` 条,按时间排序。 - **写入**:`update()` 以 append 方式存储最新的输入/输出 snapshot(文本 + 块 + 附件信息),不生成向量,适合事件流或人工批注。 +### 5.4 Mem0Memory +- **配置**:必须提供 `api_key`(从 [app.mem0.ai](https://app.mem0.ai) 获取)。可选参数 `user_id`、`agent_id`、`org_id`、`project_id` 用于记忆范围控制。 +- **实体范围**:`user_id` 和 `agent_id` 是独立的维度,可在 `add()` 和 `search()` 调用中同时使用。若同时配置,检索时使用 OR 过滤器(`{"OR": [{"user_id": ...}, {"agent_id": ...}]}`)在一次 API 调用中搜索两个范围。写入时两个 ID 同时包含。 +- **检索**:使用 Mem0 服务端语义搜索。通过 `MemoryAttachmentConfig` 中的 `top_k` 和 `similarity_threshold` 控制。 +- **写入**:`update()` 仅将用户输入(`role: "user"` 消息)发送至 Mem0。不包含 Agent 输出,以避免 LLM 响应中的内容被提取为噪声记忆。 +- **持久化**:完全由云端托管。`load()` 和 `save()` 为空操作(no-op)。记忆在不同运行和会话间自动持久化。 +- **依赖**:需安装 `mem0ai` 包(`pip install mem0ai`)。 + ## 6. EmbeddingConfig 提示 - 字段:`provider`, `model`, `api_key`, `base_url`, `params`。 - `provider=openai` 时使用 `openai.OpenAI` 客户端,可配置 `base_url` 以兼容兼容层。 diff --git a/runtime/node/agent/memory/mem0_memory.py b/runtime/node/agent/memory/mem0_memory.py index 0c6b809ac5..6e171e30a1 100644 --- a/runtime/node/agent/memory/mem0_memory.py +++ b/runtime/node/agent/memory/mem0_memory.py @@ -1,6 +1,7 @@ """Mem0 managed memory store implementation.""" import logging +import re import time import uuid from typing import Any, Dict, List @@ -45,8 +46,8 @@ class Mem0Memory(MemoryBase): Important API constraints: - Agent memories use role="assistant" + agent_id - - user_id and agent_id are stored as separate records in Mem0; - if both are configured, an OR filter is used to search across both scopes. + - user_id and agent_id are independent scoping dimensions and can be + combined in both add() and search() calls. - search() uses filters dict; add() uses top-level kwargs. - SDK returns {"memories": [...]} from search. """ @@ -151,53 +152,68 @@ def retrieve( # -------- Update -------- def update(self, payload: MemoryWritePayload) -> None: - """Store a memory in Mem0. + """Store user input as a memory in Mem0. - Uses role="assistant" + agent_id for agent-generated memories, - and role="user" + user_id for user-scoped memories. + Only user input is sent for extraction. Assistant output is excluded + to prevent noise memories from the LLM's responses. """ - snapshot = payload.output_snapshot or payload.input_snapshot - if not snapshot or not snapshot.text.strip(): + raw_input = payload.inputs_text or "" + if not raw_input.strip(): return messages = self._build_messages(payload) if not messages: return - add_kwargs: Dict[str, Any] = {"messages": messages} + add_kwargs: Dict[str, Any] = { + "messages": messages, + "infer": True, + } - # Determine scoping: agent_id takes precedence for agent-generated content + # Include both user_id and agent_id when available — they are + # independent scoping dimensions in Mem0, not mutually exclusive. if self.agent_id: add_kwargs["agent_id"] = self.agent_id - elif self.user_id: + if self.user_id: add_kwargs["user_id"] = self.user_id - else: - # Default: use agent_role as agent_id + + # Fallback when neither is configured + if "agent_id" not in add_kwargs and "user_id" not in add_kwargs: add_kwargs["agent_id"] = payload.agent_role try: - self.client.add(**add_kwargs) + result = self.client.add(**add_kwargs) + logger.info("Mem0 add result: %s", result) except Exception as e: logger.error("Mem0 add failed: %s", e) + @staticmethod + def _clean_pipeline_text(text: str) -> str: + """Strip ChatDev pipeline headers so Mem0 sees clean conversational text. + + The executor wraps each input with '=== INPUT FROM () ===' + headers. Mem0's extraction LLM treats these as system metadata and skips + them, resulting in zero memories extracted. + """ + cleaned = re.sub(r"===\s*INPUT FROM\s+\S+\s*\(\w+\)\s*===\s*", "", text) + return cleaned.strip() + def _build_messages(self, payload: MemoryWritePayload) -> List[Dict[str, str]]: """Build Mem0-compatible message list from write payload. - Agent-generated content uses role="assistant". - User input uses role="user". + Only sends user input to Mem0. Assistant output is excluded because + Mem0's extraction LLM processes ALL messages and extracts facts from + assistant responses too, creating noise memories like "Assistant says + Python is fascinating" instead of actual user facts. """ messages: List[Dict[str, str]] = [] - if payload.inputs_text and payload.inputs_text.strip(): + raw_input = payload.inputs_text or "" + clean_input = self._clean_pipeline_text(raw_input) + if clean_input: messages.append({ "role": "user", - "content": payload.inputs_text.strip(), - }) - - if payload.output_snapshot and payload.output_snapshot.text.strip(): - messages.append({ - "role": "assistant", - "content": payload.output_snapshot.text.strip(), + "content": clean_input, }) return messages diff --git a/tests/test_mem0_memory.py b/tests/test_mem0_memory.py index 09617619f3..da34270ff9 100644 --- a/tests/test_mem0_memory.py +++ b/tests/test_mem0_memory.py @@ -197,8 +197,8 @@ def test_retrieve_handles_legacy_results_key(self): class TestMem0MemoryUpdate: - def test_update_with_agent_id_uses_assistant_role(self): - """Agent-scoped update sends role=assistant messages with agent_id.""" + def test_update_sends_only_user_input(self): + """Update sends only user input, not assistant output, to prevent noise.""" memory, client = _make_mem0_memory(agent_id="agent-1") client.add.return_value = [{"id": "new", "event": "ADD"}] @@ -215,8 +215,26 @@ def test_update_with_agent_id_uses_assistant_role(self): assert call_kwargs["agent_id"] == "agent-1" assert "user_id" not in call_kwargs messages = call_kwargs["messages"] + assert len(messages) == 1 assert messages[0]["role"] == "user" - assert messages[1]["role"] == "assistant" + assert messages[0]["content"] == "Write about AI" + + def test_update_does_not_send_async_mode(self): + """Update does not send deprecated async_mode parameter.""" + memory, client = _make_mem0_memory(agent_id="agent-1") + client.add.return_value = [] + + payload = MemoryWritePayload( + agent_role="writer", + inputs_text="test", + input_snapshot=None, + output_snapshot=MemoryContentSnapshot(text="output"), + ) + memory.update(payload) + + call_kwargs = client.add.call_args[1] + assert "async_mode" not in call_kwargs + assert call_kwargs["infer"] is True def test_update_with_user_id(self): """User-scoped update uses user_id, not agent_id.""" @@ -227,7 +245,7 @@ def test_update_with_user_id(self): agent_role="writer", inputs_text="I prefer Python", input_snapshot=None, - output_snapshot=MemoryContentSnapshot(text="Noted your preference"), + output_snapshot=None, ) memory.update(payload) @@ -244,15 +262,15 @@ def test_update_fallback_uses_agent_role(self): agent_role="coder", inputs_text="test input", input_snapshot=None, - output_snapshot=MemoryContentSnapshot(text="test output"), + output_snapshot=None, ) memory.update(payload) call_kwargs = client.add.call_args[1] assert call_kwargs["agent_id"] == "coder" - def test_update_with_both_ids_prefers_agent_id(self): - """When both user_id and agent_id configured, agent_id takes precedence for writes.""" + def test_update_with_both_ids_includes_both(self): + """When both user_id and agent_id configured, both are included in add() call.""" memory, client = _make_mem0_memory(user_id="user-1", agent_id="agent-1") client.add.return_value = [] @@ -260,37 +278,37 @@ def test_update_with_both_ids_prefers_agent_id(self): agent_role="writer", inputs_text="input", input_snapshot=None, - output_snapshot=MemoryContentSnapshot(text="output"), + output_snapshot=None, ) memory.update(payload) call_kwargs = client.add.call_args[1] assert call_kwargs["agent_id"] == "agent-1" - assert "user_id" not in call_kwargs + assert call_kwargs["user_id"] == "user-1" - def test_update_empty_output_is_noop(self): - """Empty output snapshot skips API call.""" + def test_update_empty_input_is_noop(self): + """Empty inputs_text skips API call.""" memory, client = _make_mem0_memory(agent_id="a1") payload = MemoryWritePayload( agent_role="writer", - inputs_text="", + inputs_text=" ", input_snapshot=None, - output_snapshot=MemoryContentSnapshot(text=" "), + output_snapshot=MemoryContentSnapshot(text="some output"), ) memory.update(payload) client.add.assert_not_called() - def test_update_no_snapshot_is_noop(self): - """No snapshot at all skips API call.""" + def test_update_no_input_is_noop(self): + """No inputs_text skips API call.""" memory, client = _make_mem0_memory(agent_id="a1") payload = MemoryWritePayload( agent_role="writer", - inputs_text="test", + inputs_text="", input_snapshot=None, - output_snapshot=None, + output_snapshot=MemoryContentSnapshot(text="output"), ) memory.update(payload) @@ -303,14 +321,63 @@ def test_update_api_error_does_not_raise(self): payload = MemoryWritePayload( agent_role="writer", - inputs_text="test", + inputs_text="test user input", input_snapshot=None, - output_snapshot=MemoryContentSnapshot(text="output"), + output_snapshot=None, ) # Should not raise memory.update(payload) +class TestMem0MemoryPipelineTextCleaning: + + def test_strips_input_from_task_header(self): + """Pipeline headers like '=== INPUT FROM TASK (user) ===' are stripped.""" + memory, client = _make_mem0_memory(agent_id="a1") + client.add.return_value = [] + + payload = MemoryWritePayload( + agent_role="writer", + inputs_text="=== INPUT FROM TASK (user) ===\n\nMy name is Alex, I love Python", + input_snapshot=None, + output_snapshot=MemoryContentSnapshot(text="Nice to meet you Alex!"), + ) + memory.update(payload) + + call_kwargs = client.add.call_args[1] + messages = call_kwargs["messages"] + assert messages[0]["role"] == "user" + assert messages[0]["content"] == "My name is Alex, I love Python" + assert "INPUT FROM" not in messages[0]["content"] + + def test_strips_multiple_input_headers(self): + """Multiple pipeline headers from different sources are all stripped.""" + memory, client = _make_mem0_memory(agent_id="a1") + client.add.return_value = [] + + payload = MemoryWritePayload( + agent_role="writer", + inputs_text=( + "=== INPUT FROM TASK (user) ===\n\nHello\n\n" + "=== INPUT FROM reviewer (assistant) ===\n\nWorld" + ), + input_snapshot=None, + output_snapshot=MemoryContentSnapshot(text="Hi!"), + ) + memory.update(payload) + + call_kwargs = client.add.call_args[1] + user_content = call_kwargs["messages"][0]["content"] + assert "INPUT FROM" not in user_content + assert "Hello" in user_content + assert "World" in user_content + + def test_clean_text_without_headers_unchanged(self): + """Text without pipeline headers passes through unchanged.""" + from runtime.node.agent.memory.mem0_memory import Mem0Memory + assert Mem0Memory._clean_pipeline_text("Just normal text") == "Just normal text" + + class TestMem0MemoryLoadSave: def test_load_is_noop(self): diff --git a/yaml_instance/demo_mem0_memory.yaml b/yaml_instance/demo_mem0_memory.yaml index 6206e8fa16..0bb5e11312 100644 --- a/yaml_instance/demo_mem0_memory.yaml +++ b/yaml_instance/demo_mem0_memory.yaml @@ -28,19 +28,15 @@ graph: write: true edges: [] memory: - # Agent-scoped memory: uses agent_id for storing and retrieving + # User-scoped: extracts facts about the user (name, preferences, etc.) + # Agent-scoped: extracts what the agent learned (decisions, context) + # Both can be used together for different memory dimensions. - name: mem0_store type: mem0 config: api_key: ${MEM0_API_KEY} + user_id: project-user-123 agent_id: writer-agent - - # Alternative: User-scoped memory (uncomment to use instead) - # - name: mem0_store - # type: mem0 - # config: - # api_key: ${MEM0_API_KEY} - # user_id: project-user-123 start: - writer end: [] From 2d9e889b9142e13c2e69659602bffb0a841a9428 Mon Sep 17 00:00:00 2001 From: kartik-mem0 Date: Mon, 6 Apr 2026 13:23:04 +0530 Subject: [PATCH 3/3] chore: update the openai model to use the latest version --- yaml_instance/demo_mem0_memory.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yaml_instance/demo_mem0_memory.yaml b/yaml_instance/demo_mem0_memory.yaml index 0bb5e11312..9f1a2fcd4e 100644 --- a/yaml_instance/demo_mem0_memory.yaml +++ b/yaml_instance/demo_mem0_memory.yaml @@ -11,7 +11,7 @@ graph: base_url: ${BASE_URL} api_key: ${API_KEY} provider: openai - name: gpt-4o + name: gpt-5.4 role: | You are a knowledgeable writer. Use your memories to build on past interactions. If memory sections are provided (wrapped by ===== Related Memories =====),