Skip to content

Commit b1b2623

Browse files
committed
refactor(jobs): node_health_check
1 parent 74cfe5b commit b1b2623

File tree

2 files changed

+22
-32
lines changed

2 files changed

+22
-32
lines changed

app/jobs/node_checker.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22

3-
from PasarGuardNodeBridge import NodeAPIError, PasarGuardNode
3+
from PasarGuardNodeBridge import NodeAPIError, PasarGuardNode, Health
44

55
from app import on_shutdown, on_startup, scheduler
66
from app.db import GetDB
@@ -32,13 +32,29 @@ async def check_node(id: int, node: PasarGuardNode):
3232
if e.code > 0:
3333
await node_operator.connect_node(node_id=id)
3434

35-
broken_nodes, not_connected_nodes = await node_manager.get_nodes_by_health_status()
35+
async def check_health(db_node: Node, node: PasarGuardNode):
36+
if node is None:
37+
return
38+
try:
39+
health = await asyncio.wait_for(node.get_health(), timeout=10)
40+
except (asyncio.TimeoutError, NodeAPIError):
41+
await node_operator.update_node_status(db_node.id, NodeStatus.error, err="Get health timeout")
42+
43+
if db_node.status in (NodeStatus.connecting, NodeStatus.error) and health is Health.HEALTHY:
44+
await node_operator.update_node_status(db_node.id, NodeStatus.connected)
45+
46+
elif db_node.status in (NodeStatus.connecting, NodeStatus.error) and health is Health.NOT_CONNECTED:
47+
await node_operator.connect_node(node_id=db_node.id)
3648

37-
check_tasks = [check_node(id, node) for id, node in broken_nodes]
38-
connect_tasks = [node_operator.connect_node(id) for id, _ in not_connected_nodes]
49+
elif db_node.status == NodeStatus.connected and health is not Health.HEALTHY:
50+
await check_node(db_node.id, node)
51+
52+
async with GetDB() as db:
53+
db_nodes = await get_nodes(db=db, enabled=True)
54+
dict_nodes = await node_manager.get_nodes()
3955

40-
# Use return_exceptions=True to prevent one failed node from stopping others
41-
await asyncio.gather(*check_tasks + connect_tasks, return_exceptions=True)
56+
check_tasks = [check_health(db_node, dict_nodes[db_node.id]) for db_node in db_nodes]
57+
await asyncio.gather(*check_tasks, return_exceptions=True)
4258

4359

4460
@on_startup

app/node/__init__.py

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -86,32 +86,6 @@ async def get_not_connected_nodes(self) -> list[tuple[int, PasarGuardNode]]:
8686
]
8787
return nodes
8888

89-
async def get_nodes_by_health_status(
90-
self,
91-
) -> tuple[list[tuple[int, PasarGuardNode]], list[tuple[int, PasarGuardNode]]]:
92-
"""Optimized method to get both broken and not_connected nodes in single lock acquisition"""
93-
async with self._lock.reader_lock:
94-
if not self._nodes:
95-
return [], []
96-
97-
health_tasks = [node.get_health() for node in self._nodes.values()]
98-
health_results = await asyncio.gather(*health_tasks, return_exceptions=True)
99-
100-
broken_nodes = []
101-
not_connected_nodes = []
102-
103-
for (node_id, node), health in zip(self._nodes.items(), health_results):
104-
if isinstance(health, Exception):
105-
continue
106-
if health == Health.BROKEN:
107-
broken_nodes.append((node_id, node))
108-
elif health == Health.NOT_CONNECTED:
109-
not_connected_nodes.append((node_id, node))
110-
else:
111-
continue
112-
113-
return broken_nodes, not_connected_nodes
114-
11589
async def update_user(self, user: UserResponse, inbounds: list[str] = None):
11690
proto_user = serialize_user_for_node(user.id, user.username, user.proxy_settings.dict(), inbounds)
11791

0 commit comments

Comments
 (0)