From 6c570711821ee8f10ac635b8e5fc66a0d69642ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 9 Oct 2025 14:49:20 +0800 Subject: [PATCH 01/10] fix: nebula search bug --- src/memos/graph_dbs/nebular.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/memos/graph_dbs/nebular.py b/src/memos/graph_dbs/nebular.py index 66ad894ad..e9aa505e8 100644 --- a/src/memos/graph_dbs/nebular.py +++ b/src/memos/graph_dbs/nebular.py @@ -985,8 +985,7 @@ def search_by_embedding( dim = len(vector) vector_str = ",".join(f"{float(x)}" for x in vector) gql_vector = f"VECTOR<{dim}, FLOAT>([{vector_str}])" - - where_clauses = [] + where_clauses = ["n.embedding_1024 IS NOT NULL"] if scope: where_clauses.append(f'n.memory_type = "{scope}"') if status: @@ -1011,12 +1010,9 @@ def search_by_embedding( MATCH (n@Memory) {where_clause} ORDER BY inner_product(n.{self.dim_field}, {gql_vector}) DESC - APPROXIMATE LIMIT {top_k} - OPTIONS {{ METRIC: IP, TYPE: IVF, NPROBE: 8 }} RETURN n.id AS id, inner_product(n.{self.dim_field}, {gql_vector}) AS score """ - try: result = self.execute_query(gql) except Exception as e: From b5546b43b6420a5cfdb681620a4fdaa8f9d49e95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 9 Oct 2025 15:51:10 +0800 Subject: [PATCH 02/10] fix: nebula search bug --- src/memos/graph_dbs/nebular.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/memos/graph_dbs/nebular.py b/src/memos/graph_dbs/nebular.py index e9aa505e8..ef6714366 100644 --- a/src/memos/graph_dbs/nebular.py +++ b/src/memos/graph_dbs/nebular.py @@ -1007,12 +1007,12 @@ def search_by_embedding( where_clause = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else "" gql = f""" + let a = {gql_vector} MATCH (n@Memory) {where_clause} - ORDER BY inner_product(n.{self.dim_field}, {gql_vector}) DESC + ORDER BY inner_product(n.{self.dim_field}, a) DESC LIMIT {top_k} - RETURN n.id AS id, inner_product(n.{self.dim_field}, {gql_vector}) AS score - """ + RETURN n.id AS id, inner_product(n.{self.dim_field}, a) AS score""" try: result = self.execute_query(gql) except Exception as e: From e3b0606296c514901205acf040999cac76db6071 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 9 Oct 2025 16:25:31 +0800 Subject: [PATCH 03/10] fix: auto create bug --- src/memos/graph_dbs/nebular.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/memos/graph_dbs/nebular.py b/src/memos/graph_dbs/nebular.py index ef6714366..c559db694 100644 --- a/src/memos/graph_dbs/nebular.py +++ b/src/memos/graph_dbs/nebular.py @@ -188,6 +188,19 @@ def _get_or_create_shared_client(cls, cfg: NebulaGraphDBConfig) -> tuple[str, "N client = cls._CLIENT_CACHE.get(key) if client is None: # Connection setting + + tmp_client = NebulaClient( + hosts=cfg.uri, + username=cfg.user, + password=cfg.password, + session_config=SessionConfig(graph=None), + session_pool_config=SessionPoolConfig(size=1, wait_timeout=3000), + ) + try: + cls._ensure_space_exists(tmp_client, cfg) + finally: + tmp_client.close() + conn_conf: ConnectionConfig | None = getattr(cfg, "conn_config", None) if conn_conf is None: conn_conf = ConnectionConfig.from_defults( @@ -1467,6 +1480,25 @@ def merge_nodes(self, id1: str, id2: str) -> str: """ raise NotImplementedError + @classmethod + def _ensure_space_exists(cls, tmp_client, cfg): + """Lightweight check to ensure target graph (space) exists.""" + db_name = getattr(cfg, "space", None) + if not db_name: + logger.warning("[NebulaGraphDBSync] No `space` specified in cfg.") + return + + try: + res = tmp_client.execute("SHOW GRAPHS;") + existing = {row.values()[0].as_string() for row in res} + if db_name not in existing: + tmp_client.execute(f"CREATE GRAPH IF NOT EXISTS `{db_name}` TYPED MemOSBgeM3Type;") + logger.info(f"✅ Graph `{db_name}` created before session binding.") + else: + logger.debug(f"Graph `{db_name}` already exists.") + except Exception: + logger.exception("[NebulaGraphDBSync] Failed to ensure space exists") + @timed def _ensure_database_exists(self): graph_type_name = "MemOSBgeM3Type" From 0d202a6448b9a13099e3ee87e5b6c74d6b95d013 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 9 Oct 2025 16:27:53 +0800 Subject: [PATCH 04/10] feat: add single-db-only assertion --- src/memos/graph_dbs/nebular.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/memos/graph_dbs/nebular.py b/src/memos/graph_dbs/nebular.py index c559db694..3c97afa0a 100644 --- a/src/memos/graph_dbs/nebular.py +++ b/src/memos/graph_dbs/nebular.py @@ -331,6 +331,7 @@ def __init__(self, config: NebulaGraphDBConfig): } """ + assert config.use_multi_db is False, "Multi-DB MODE IS NOT SUPPORTED" self.config = config self.db_name = config.space self.user_name = config.user_name From c2f135fd579937a1465084b2a8327f60bca2c77e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Fri, 10 Oct 2025 11:25:50 +0800 Subject: [PATCH 05/10] feat: make count_nodes support optional memory_type filtering --- src/memos/graph_dbs/nebular.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/memos/graph_dbs/nebular.py b/src/memos/graph_dbs/nebular.py index 3c97afa0a..3b1aabde4 100644 --- a/src/memos/graph_dbs/nebular.py +++ b/src/memos/graph_dbs/nebular.py @@ -451,6 +451,10 @@ def remove_oldest_memory(self, memory_type: str, keep_latest: int) -> None: OFFSET {keep_latest} DETACH DELETE n """ + try: + self.execute_query(query) + except Exception as e: + logger.warning(f"Delete old mem error: {e}") self.execute_query(query) @timed @@ -611,14 +615,19 @@ def get_memory_count(self, memory_type: str) -> int: return -1 @timed - def count_nodes(self, scope: str) -> int: - query = f""" - MATCH (n@Memory) - WHERE n.memory_type = "{scope}" - """ + def count_nodes(self, scope: str | None = None) -> int: + query = "MATCH (n@Memory)" + conditions = [] + + if scope: + conditions.append(f'n.memory_type = "{scope}"') if not self.config.use_multi_db and self.config.user_name: user_name = self.config.user_name - query += f"\nAND n.user_name = '{user_name}'" + conditions.append(f"n.user_name = '{user_name}'") + + if conditions: + query += "\nWHERE " + " AND ".join(conditions) + query += "\nRETURN count(n) AS count" result = self.execute_query(query) From 30d492a07db0316aee88108abb3de7f3f5b808c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Fri, 10 Oct 2025 11:27:45 +0800 Subject: [PATCH 06/10] fix: dim_field when filter non-embedding nodes --- src/memos/graph_dbs/nebular.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/graph_dbs/nebular.py b/src/memos/graph_dbs/nebular.py index 3b1aabde4..5b01c9790 100644 --- a/src/memos/graph_dbs/nebular.py +++ b/src/memos/graph_dbs/nebular.py @@ -1008,7 +1008,7 @@ def search_by_embedding( dim = len(vector) vector_str = ",".join(f"{float(x)}" for x in vector) gql_vector = f"VECTOR<{dim}, FLOAT>([{vector_str}])" - where_clauses = ["n.embedding_1024 IS NOT NULL"] + where_clauses = [f"n.{self.dim_field} IS NOT NULL"] if scope: where_clauses.append(f'n.memory_type = "{scope}"') if status: From 35555d8419bbbe1ad1a17f16c7e3fef7ed3dcb8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Fri, 10 Oct 2025 14:24:54 +0800 Subject: [PATCH 07/10] feat: add optional whether include embedding when export graph --- src/memos/memories/textual/tree.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index f324f41c9..0048f4a59 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -326,10 +326,10 @@ def load(self, dir: str) -> None: except Exception as e: logger.error(f"An error occurred while loading memories: {e}") - def dump(self, dir: str) -> None: + def dump(self, dir: str, include_embedding: bool = False) -> None: """Dump memories to os.path.join(dir, self.config.memory_filename)""" try: - json_memories = self.graph_store.export_graph() + json_memories = self.graph_store.export_graph(include_embedding=include_embedding) os.makedirs(dir, exist_ok=True) memory_file = os.path.join(dir, self.config.memory_filename) From cf6f8d1a4bc0fd376a8d189023e9d99769144874 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Fri, 10 Oct 2025 14:25:50 +0800 Subject: [PATCH 08/10] fix[WIP]: remove oldest memory update --- .../tree_text_memory/organize/manager.py | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 5cc714806..b0224655c 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -67,30 +67,33 @@ def add(self, memories: list[TextualMemoryItem]) -> list[str]: except Exception as e: logger.exception("Memory processing error: ", exc_info=e) - try: - self.graph_store.remove_oldest_memory( - memory_type="WorkingMemory", keep_latest=self.memory_size["WorkingMemory"] - ) - except Exception: - logger.warning(f"Remove WorkingMemory error: {traceback.format_exc()}") - - try: - self.graph_store.remove_oldest_memory( - memory_type="LongTermMemory", keep_latest=self.memory_size["LongTermMemory"] - ) - except Exception: - logger.warning(f"Remove LongTermMemory error: {traceback.format_exc()}") - - try: - self.graph_store.remove_oldest_memory( - memory_type="UserMemory", keep_latest=self.memory_size["UserMemory"] - ) - except Exception: - logger.warning(f"Remove UserMemory error: {traceback.format_exc()}") + # Only clean up if we're close to or over the limit + self._cleanup_memories_if_needed() self._refresh_memory_size() return added_ids + def _cleanup_memories_if_needed(self) -> None: + """ + Only clean up memories if we're close to or over the limit. + This reduces unnecessary database operations. + """ + cleanup_threshold = 0.8 # Clean up when 80% full + + for memory_type, limit in self.memory_size.items(): + current_count = self.current_memory_size.get(memory_type, 0) + threshold = int(limit * cleanup_threshold) + + # Only clean up if we're at or above the threshold + if current_count >= threshold: + try: + self.graph_store.remove_oldest_memory( + memory_type=memory_type, keep_latest=limit + ) + logger.debug(f"Cleaned up {memory_type}: {current_count} -> {limit}") + except Exception: + logger.warning(f"Remove {memory_type} error: {traceback.format_exc()}") + def replace_working_memory(self, memories: list[TextualMemoryItem]) -> None: """ Replace WorkingMemory From 2790f5ceb9a5fd8780926656f368ecd92587fe4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Fri, 10 Oct 2025 18:03:03 +0800 Subject: [PATCH 09/10] feat: modify nebula search embedding efficiency --- src/memos/graph_dbs/nebular.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/graph_dbs/nebular.py b/src/memos/graph_dbs/nebular.py index 5b01c9790..d79f5ff50 100644 --- a/src/memos/graph_dbs/nebular.py +++ b/src/memos/graph_dbs/nebular.py @@ -1031,7 +1031,7 @@ def search_by_embedding( gql = f""" let a = {gql_vector} - MATCH (n@Memory) + MATCH (n@Memory /*+ INDEX(idx_memory_user_name) */) {where_clause} ORDER BY inner_product(n.{self.dim_field}, a) DESC LIMIT {top_k} From de86311ad216faa53e41a96405df1ba7838b333c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Fri, 10 Oct 2025 20:01:09 +0800 Subject: [PATCH 10/10] fix: modify nebula remove old memory --- src/memos/graph_dbs/nebular.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/memos/graph_dbs/nebular.py b/src/memos/graph_dbs/nebular.py index d79f5ff50..45656b770 100644 --- a/src/memos/graph_dbs/nebular.py +++ b/src/memos/graph_dbs/nebular.py @@ -443,19 +443,21 @@ def remove_oldest_memory(self, memory_type: str, keep_latest: int) -> None: if not self.config.use_multi_db and self.config.user_name: optional_condition = f"AND n.user_name = '{self.config.user_name}'" - query = f""" - MATCH (n@Memory) - WHERE n.memory_type = '{memory_type}' - {optional_condition} - ORDER BY n.updated_at DESC - OFFSET {keep_latest} - DETACH DELETE n - """ - try: - self.execute_query(query) - except Exception as e: - logger.warning(f"Delete old mem error: {e}") - self.execute_query(query) + count = self.count_nodes(memory_type) + + if count > keep_latest: + delete_query = f""" + MATCH (n@Memory) + WHERE n.memory_type = '{memory_type}' + {optional_condition} + ORDER BY n.updated_at DESC + OFFSET {keep_latest} + DETACH DELETE n + """ + try: + self.execute_query(delete_query) + except Exception as e: + logger.warning(f"Delete old mem error: {e}") @timed def add_node(self, id: str, memory: str, metadata: dict[str, Any]) -> None: