diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 20e02bd0c..de05185d2 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -151,6 +151,10 @@ def __init__(self, config: PolarDBGraphDBConfig): user=user, password=password, dbname=self.db_name, + connect_timeout=60, # Connection timeout in seconds + keepalives_idle=40, # Seconds of inactivity before sending keepalive (should be < server idle timeout) + keepalives_interval=15, # Seconds between keepalive retries + keepalives_count=5, # Number of keepalive retries before considering connection dead ) # Keep a reference to the pool for cleanup @@ -179,7 +183,7 @@ def _get_config_value(self, key: str, default=None): else: return getattr(self.config, key, default) - def _get_connection(self): + def _get_connection_old(self): """Get a connection from the pool.""" if self._pool_closed: raise RuntimeError("Connection pool has been closed") @@ -188,7 +192,60 @@ def _get_connection(self): conn.autocommit = True return conn + def _get_connection(self): + """Get a connection from the pool.""" + if self._pool_closed: + raise RuntimeError("Connection pool has been closed") + + max_retries = 3 + for attempt in range(max_retries): + try: + conn = self.connection_pool.getconn() + + # Check if connection is closed + if conn.closed != 0: + # Connection is closed, close it explicitly and try again + try: + conn.close() + except Exception as e: + logger.warning(f"Failed to close connection: {e}") + if attempt < max_retries - 1: + continue + else: + raise RuntimeError("Pool returned a closed connection") + + # Set autocommit for PolarDB compatibility + conn.autocommit = True + return conn + except Exception as e: + if attempt >= max_retries - 1: + raise RuntimeError(f"Failed to get a valid connection from pool: {e}") from e + continue + def _return_connection(self, connection): + """Return a connection to the pool.""" + if not self._pool_closed and connection: + try: + # Check if connection is closed + if hasattr(connection, "closed") and connection.closed != 0: + # Connection is closed, just close it and don't return to pool + try: + connection.close() + except Exception as e: + logger.warning(f"Failed to close connection: {e}") + return + + # Connection is valid, return to pool + self.connection_pool.putconn(connection) + except Exception as e: + # If putconn fails, close the connection + logger.warning(f"Failed to return connection to pool: {e}") + try: + connection.close() + except Exception as e: + logger.warning(f"Failed to close connection: {e}") + + def _return_connection_old(self, connection): """Return a connection to the pool.""" if not self._pool_closed and connection: self.connection_pool.putconn(connection) @@ -1834,7 +1891,7 @@ def export_graph( if include_embedding and embedding_json is not None: properties["embedding"] = embedding_json - nodes.append(self._parse_node(properties)) + nodes.append(self._parse_node(json.loads(properties[1]))) except Exception as e: logger.error(f"[EXPORT GRAPH - NODES] Exception: {e}", exc_info=True) diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index 53628d075..dea3cc1ab 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -260,7 +260,7 @@ def get_relevant_subgraph( center_id=core_id, depth=depth, center_status=center_status ) - if not subgraph["core_node"]: + if subgraph is None or not subgraph["core_node"]: logger.info(f"Skipping node {core_id} (inactive or not found).") continue @@ -281,9 +281,9 @@ def get_relevant_subgraph( {"id": core_id, "score": score, "core_node": core_node, "neighbors": neighbors} ) - top_core = cores[0] + top_core = cores[0] if cores else None return { - "core_id": top_core["id"], + "core_id": top_core["id"] if top_core else None, "nodes": list(all_nodes.values()), "edges": [{"source": f, "target": t, "type": ty} for (f, t, ty) in all_edges], }