Skip to content

Commit e7b7157

Browse files
M03EDImMohammad20000
authored andcommitted
fix(node): health check edge cases handling
1 parent 2ac8d09 commit e7b7157

File tree

1 file changed

+64
-45
lines changed

1 file changed

+64
-45
lines changed

app/jobs/node_checker.py

Lines changed: 64 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -19,62 +19,47 @@
1919
logger = get_logger("node-checker")
2020

2121

22-
async def verify_node_backend_health(node: PasarGuardNode, node_name: str) -> Health:
22+
async def verify_node_backend_health(node: PasarGuardNode, node_name: str) -> tuple[Health, int | None, str | None]:
2323
"""
2424
Verify node health by checking backend stats.
25-
Returns updated health status.
25+
Returns (health, error_code, error_message) - error_code and error_message are None if no error occurred.
2626
"""
2727
current_health = await asyncio.wait_for(node.get_health(), timeout=10)
2828

2929
# Skip nodes that are not connected or invalid
3030
if current_health in (Health.NOT_CONNECTED, Health.INVALID):
31-
return current_health
31+
return current_health, None, None
3232

3333
try:
3434
await node.get_backend_stats()
3535
if current_health != Health.HEALTHY:
3636
await node.set_health(Health.HEALTHY)
3737
logger.debug(f"[{node_name}] Node health is HEALTHY")
38-
return Health.HEALTHY
38+
return Health.HEALTHY, None, None
39+
except NodeAPIError as e:
40+
logger.error(f"[{node_name}] Health check failed, setting health to BROKEN | Error: NodeAPIError(code={e.code}) - {e.detail}")
41+
try:
42+
await node.set_health(Health.BROKEN)
43+
return Health.BROKEN, e.code, e.detail
44+
except Exception as e_set_health:
45+
error_type_set = type(e_set_health).__name__
46+
logger.error(
47+
f"[{node_name}] Failed to set health to BROKEN | Error: {error_type_set} - {str(e_set_health)}"
48+
)
49+
return current_health, e.code, e.detail
3950
except Exception as e:
4051
error_type = type(e).__name__
41-
logger.error(f"[{node_name}] Health check failed, setting health to BROKEN | Error: {error_type} - {str(e)}")
52+
error_message = f"{error_type}: {str(e)}"
53+
logger.error(f"[{node_name}] Health check failed, setting health to BROKEN | Error: {error_message}")
4254
try:
4355
await node.set_health(Health.BROKEN)
44-
return Health.BROKEN
56+
return Health.BROKEN, None, error_message
4557
except Exception as e_set_health:
4658
error_type_set = type(e_set_health).__name__
4759
logger.error(
4860
f"[{node_name}] Failed to set health to BROKEN | Error: {error_type_set} - {str(e_set_health)}"
4961
)
50-
return current_health
51-
52-
53-
async def update_node_connection_status(node_id: int, node: PasarGuardNode):
54-
"""
55-
Update node connection status by getting backend stats and version info.
56-
"""
57-
try:
58-
await node.get_backend_stats()
59-
node_version, core_version = await asyncio.wait_for(node.get_versions(), timeout=10)
60-
async with GetDB() as db:
61-
await NodeOperation._update_single_node_status(
62-
db,
63-
node_id,
64-
NodeStatus.connected,
65-
xray_version=core_version,
66-
node_version=node_version,
67-
)
68-
except asyncio.TimeoutError:
69-
logger.warning(f"Node {node_id} get versions timed out, will retry on next check")
70-
return
71-
except NodeAPIError as e:
72-
if e.code > -3:
73-
async with GetDB() as db:
74-
await NodeOperation._update_single_node_status(db, node_id, NodeStatus.error, message=e.detail)
75-
if e.code > 0:
76-
async with GetDB() as db:
77-
await node_operator.connect_single_node(db, node_id)
62+
return current_health, None, error_message
7863

7964

8065
async def process_node_health_check(db_node: Node, node: PasarGuardNode):
@@ -84,34 +69,71 @@ async def process_node_health_check(db_node: Node, node: PasarGuardNode):
8469
2. Verify backend health
8570
3. Compare with database status
8671
4. Update status if needed
72+
73+
Timeout handling:
74+
- For timeout errors (code=-1): Don't reconnect, just wait for recovery
75+
- For other errors (code > -1): Reconnect (connection works but has another issue)
76+
- For NOT_CONNECTED/INVALID: Reconnect immediately
8777
"""
8878
if node is None:
8979
return
9080

9181
try:
92-
health = await verify_node_backend_health(node, db_node.name)
82+
health, error_code, error_message = await verify_node_backend_health(node, db_node.name)
9383
except asyncio.TimeoutError:
94-
if db_node.status == NodeStatus.connected:
95-
logger.warning(
96-
f"Node {db_node.id} ({db_node.name}) health check timed out but was previously connected, will retry"
97-
)
98-
return
84+
# Record timeout error in database but don't reconnect
85+
logger.warning(f"[{db_node.name}] Health check timed out")
9986
async with GetDB() as db:
10087
await NodeOperation._update_single_node_status(
10188
db, db_node.id, NodeStatus.error, message="Health check timeout"
10289
)
90+
return
10391
except NodeAPIError as e:
92+
# Record error in database
10493
async with GetDB() as db:
10594
await NodeOperation._update_single_node_status(db, db_node.id, NodeStatus.error, message=e.detail)
106-
107-
if node.requires_hard_reset() or health is None or health in (Health.NOT_CONNECTED, Health.INVALID):
95+
# For timeout errors (code=-1), don't reconnect - just wait for recovery
96+
if e.code == -1:
97+
logger.warning(f"[{db_node.name}] Health check timed out (NodeAPIError), waiting for recovery")
98+
return
99+
# For other errors, reconnect
108100
async with GetDB() as db:
109101
await node_operator.connect_single_node(db, db_node.id)
110102
return
111103

112104
# Skip nodes that are already healthy and connected
113105
if health == Health.HEALTHY and db_node.status == NodeStatus.connected:
114106
return
107+
108+
# Handle hard reset requirement
109+
if node.requires_hard_reset():
110+
async with GetDB() as db:
111+
await node_operator.connect_single_node(db, db_node.id)
112+
return
113+
114+
if health is Health.INVALID:
115+
logger.warning(f"[{db_node.name}] Node health is INVALID, ignoring...")
116+
return
117+
118+
# Handle NOT_CONNECTED - reconnect immediately
119+
if health is Health.NOT_CONNECTED:
120+
async with GetDB() as db:
121+
await node_operator.connect_single_node(db, db_node.id)
122+
return
123+
124+
# Handle BROKEN health
125+
if health == Health.BROKEN:
126+
# Record actual error in database
127+
async with GetDB() as db:
128+
await NodeOperation._update_single_node_status(
129+
db, db_node.id, NodeStatus.error, message=error_message
130+
)
131+
# Only reconnect for non-timeout errors (code > -1)
132+
if error_code is not None and error_code > -1:
133+
async with GetDB() as db:
134+
await node_operator.connect_single_node(db, db_node.id)
135+
# For timeout (code=-1 or None), just wait - don't reconnect
136+
return
115137

116138
# Update status for recovering nodes
117139
if db_node.status in (NodeStatus.connecting, NodeStatus.error) and health == Health.HEALTHY:
@@ -126,9 +148,6 @@ async def process_node_health_check(db_node: Node, node: PasarGuardNode):
126148
)
127149
return
128150

129-
# For all other cases, update connection status
130-
await update_node_connection_status(db_node.id, node)
131-
132151

133152
async def check_node_limits():
134153
"""

0 commit comments

Comments
 (0)