Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/memos/graph_dbs/polardb.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def _get_connection(self):

max_retries = 3
for attempt in range(max_retries):
conn = None
try:
conn = self.connection_pool.getconn()

Expand All @@ -216,8 +217,49 @@ def _get_connection(self):

# Set autocommit for PolarDB compatibility
conn.autocommit = True

# Test connection health with SELECT 1
try:
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
cursor.close()
except Exception as health_check_error:
# Connection is not usable, close it and try again
logger.warning(
f"Connection health check failed: {health_check_error}, closing connection and retrying..."
)
try:
conn.close()
except Exception as close_error:
logger.warning(f"Failed to close unhealthy connection: {close_error}")

# Return connection to pool if it's still valid
try:
self.connection_pool.putconn(conn, close=True)
except Exception as close_error:
logger.warning(f"Failed to connection_pool.putconn: {close_error}")

conn = None
if attempt < max_retries - 1:
continue
else:
raise RuntimeError(
f"Failed to get a healthy connection from pool after {max_retries} attempts: {health_check_error}"
) from health_check_error

# Connection is healthy, return it
return conn
except Exception as e:
# If we have a connection that failed, try to return it to pool
if conn is not None:
try:
self.connection_pool.putconn(conn, close=True)
except Exception as putconn_error:
logger.warning(
f"Failed to connection_pool.putconn to pool: {putconn_error}"
)

if attempt >= max_retries - 1:
raise RuntimeError(f"Failed to get a valid connection from pool: {e}") from e
continue
Expand Down Expand Up @@ -647,12 +689,16 @@ def add_edge(
self, source_id: str, target_id: str, type: str, user_name: str | None = None
) -> None:
if not source_id or not target_id:
logger.warning(f"Edge '{source_id}' and '{target_id}' are both None")
raise ValueError("[add_edge] source_id and target_id must be provided")

source_exists = self.get_node(source_id) is not None
target_exists = self.get_node(target_id) is not None

if not source_exists or not target_exists:
logger.warning(
"[add_edge] Source %s or target %s does not exist.", source_exists, target_exists
)
raise ValueError("[add_edge] source_id and target_id must be provided")

properties = {}
Expand Down
Loading