From 4c89f6a5afb257f9d90715ded91d69fc5b3c10d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Tue, 11 Nov 2025 19:28:29 +0800 Subject: [PATCH] add pool health --- src/memos/graph_dbs/polardb.py | 46 ++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 60902420..da163529 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -199,6 +199,7 @@ def _get_connection(self): max_retries = 3 for attempt in range(max_retries): + conn = None try: conn = self.connection_pool.getconn() @@ -216,8 +217,49 @@ def _get_connection(self): # Set autocommit for PolarDB compatibility conn.autocommit = True + + # Test connection health with SELECT 1 + try: + cursor = conn.cursor() + cursor.execute("SELECT 1") + cursor.fetchone() + cursor.close() + except Exception as health_check_error: + # Connection is not usable, close it and try again + logger.warning( + f"Connection health check failed: {health_check_error}, closing connection and retrying..." + ) + try: + conn.close() + except Exception as close_error: + logger.warning(f"Failed to close unhealthy connection: {close_error}") + + # Return connection to pool if it's still valid + try: + self.connection_pool.putconn(conn, close=True) + except Exception as close_error: + logger.warning(f"Failed to connection_pool.putconn: {close_error}") + + conn = None + if attempt < max_retries - 1: + continue + else: + raise RuntimeError( + f"Failed to get a healthy connection from pool after {max_retries} attempts: {health_check_error}" + ) from health_check_error + + # Connection is healthy, return it return conn except Exception as e: + # If we have a connection that failed, try to return it to pool + if conn is not None: + try: + self.connection_pool.putconn(conn, close=True) + except Exception as putconn_error: + logger.warning( + f"Failed to connection_pool.putconn to pool: {putconn_error}" + ) + if attempt >= max_retries - 1: raise RuntimeError(f"Failed to get a valid connection from pool: {e}") from e continue @@ -647,12 +689,16 @@ def add_edge( self, source_id: str, target_id: str, type: str, user_name: str | None = None ) -> None: if not source_id or not target_id: + logger.warning(f"Edge '{source_id}' and '{target_id}' are both None") raise ValueError("[add_edge] source_id and target_id must be provided") source_exists = self.get_node(source_id) is not None target_exists = self.get_node(target_id) is not None if not source_exists or not target_exists: + logger.warning( + "[add_edge] Source %s or target %s does not exist.", source_exists, target_exists + ) raise ValueError("[add_edge] source_id and target_id must be provided") properties = {}