Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions src/memos/graph_dbs/polardb.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ def __init__(self, config: PolarDBGraphDBConfig):
user=user,
password=password,
dbname=self.db_name,
connect_timeout=60, # Connection timeout in seconds
keepalives_idle=40, # Seconds of inactivity before sending keepalive (should be < server idle timeout)
keepalives_interval=15, # Seconds between keepalive retries
keepalives_count=5, # Number of keepalive retries before considering connection dead
)

# Keep a reference to the pool for cleanup
Expand Down Expand Up @@ -179,7 +183,7 @@ def _get_config_value(self, key: str, default=None):
else:
return getattr(self.config, key, default)

def _get_connection(self):
def _get_connection_old(self):
"""Get a connection from the pool."""
if self._pool_closed:
raise RuntimeError("Connection pool has been closed")
Expand All @@ -188,7 +192,60 @@ def _get_connection(self):
conn.autocommit = True
return conn

def _get_connection(self):
"""Get a connection from the pool."""
if self._pool_closed:
raise RuntimeError("Connection pool has been closed")

max_retries = 3
for attempt in range(max_retries):
try:
conn = self.connection_pool.getconn()

# Check if connection is closed
if conn.closed != 0:
# Connection is closed, close it explicitly and try again
try:
conn.close()
except Exception as e:
logger.warning(f"Failed to close connection: {e}")
if attempt < max_retries - 1:
continue
else:
raise RuntimeError("Pool returned a closed connection")

# Set autocommit for PolarDB compatibility
conn.autocommit = True
return conn
except Exception as e:
if attempt >= max_retries - 1:
raise RuntimeError(f"Failed to get a valid connection from pool: {e}") from e
continue

def _return_connection(self, connection):
"""Return a connection to the pool."""
if not self._pool_closed and connection:
try:
# Check if connection is closed
if hasattr(connection, "closed") and connection.closed != 0:
# Connection is closed, just close it and don't return to pool
try:
connection.close()
except Exception as e:
logger.warning(f"Failed to close connection: {e}")
return

# Connection is valid, return to pool
self.connection_pool.putconn(connection)
except Exception as e:
# If putconn fails, close the connection
logger.warning(f"Failed to return connection to pool: {e}")
try:
connection.close()
except Exception as e:
logger.warning(f"Failed to close connection: {e}")

def _return_connection_old(self, connection):
"""Return a connection to the pool."""
if not self._pool_closed and connection:
self.connection_pool.putconn(connection)
Expand Down Expand Up @@ -1834,7 +1891,7 @@ def export_graph(
if include_embedding and embedding_json is not None:
properties["embedding"] = embedding_json

nodes.append(self._parse_node(properties))
nodes.append(self._parse_node(json.loads(properties[1])))

except Exception as e:
logger.error(f"[EXPORT GRAPH - NODES] Exception: {e}", exc_info=True)
Expand Down
6 changes: 3 additions & 3 deletions src/memos/memories/textual/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def get_relevant_subgraph(
center_id=core_id, depth=depth, center_status=center_status
)

if not subgraph["core_node"]:
if subgraph is None or not subgraph["core_node"]:
logger.info(f"Skipping node {core_id} (inactive or not found).")
continue

Expand All @@ -281,9 +281,9 @@ def get_relevant_subgraph(
{"id": core_id, "score": score, "core_node": core_node, "neighbors": neighbors}
)

top_core = cores[0]
top_core = cores[0] if cores else None
return {
"core_id": top_core["id"],
"core_id": top_core["id"] if top_core else None,
"nodes": list(all_nodes.values()),
"edges": [{"source": f, "target": t, "type": ty} for (f, t, ty) in all_edges],
}
Expand Down
Loading