Skip to content

Commit d28d5ba

Browse files
committed
fix: Enhance core state management and cache synchronization
1 parent df8ae08 commit d28d5ba

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

app/core/manager.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,16 +218,18 @@ async def _update_core_local(self, db_core_config: CoreConfig):
218218
await self._persist_state()
219219

220220
async def _update_core_nats(self, db_core_config: CoreConfig):
221-
# Validate core before publishing
222-
# Keep local state in sync immediately, while still broadcasting via NATS.
221+
# Persist local state (and KV snapshot) before broadcasting.
222+
# This lets node workers refresh from KV and avoids reconnect races.
223+
await self._update_core_local(db_core_config)
224+
225+
# Validate payload before publishing the broadcast message.
223226
self.validate_core(
224227
db_core_config.config, db_core_config.exclude_inbound_tags, db_core_config.fallbacks_inbound_tags
225228
)
226229
try:
227230
await self._publish_invalidation({"action": "update", "core": self._core_payload_from_db(db_core_config)})
228231
except Exception as exc:
229232
self._logger.warning(f"Failed to publish core update via NATS: {exc}")
230-
await self._update_core_local(db_core_config)
231233

232234
async def update_core(self, db_core_config: CoreConfig):
233235
await self._update_core_impl(db_core_config)
@@ -244,11 +246,13 @@ async def _remove_core_local(self, core_id: int):
244246
await self._persist_state()
245247

246248
async def _remove_core_nats(self, core_id: int):
249+
# Persist local removal (and KV snapshot) before broadcasting.
250+
await self._remove_core_local(core_id)
251+
247252
try:
248253
await self._publish_invalidation({"action": "remove", "core_id": core_id})
249254
except Exception as exc:
250255
self._logger.warning(f"Failed to publish core remove via NATS: {exc}")
251-
await self._remove_core_local(core_id)
252256

253257
async def remove_core(self, core_id: int):
254258
await self._remove_core_impl(core_id)

app/node/worker.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from PasarGuardNodeBridge.common.service_pb2 import User as ProtoUser
99

1010
from app import on_shutdown, on_startup
11+
from app.core.manager import core_manager
1112
from app.db import GetDB
1213
from app.db.crud.node import get_node_by_id, get_nodes
1314
from app.models.node import NodeCoreUpdate, NodeGeoFilesUpdate
@@ -170,12 +171,16 @@ async def _connect_node(self, data: dict):
170171
node_id = data.get("node_id")
171172
if not node_id:
172173
return
174+
# Refresh from KV before connecting to avoid stale core cache races.
175+
await core_manager._reload_from_cache()
173176
async with GetDB() as db:
174177
await self._node_operator.connect_single_node(db, node_id)
175178

176179
async def _connect_nodes_bulk(self, data: dict):
177180
node_ids = data.get("node_ids")
178181
core_id = data.get("core_id")
182+
# Refresh from KV before bulk reconnect to avoid stale core cache races.
183+
await core_manager._reload_from_cache()
179184
async with GetDB() as db:
180185
if node_ids:
181186
nodes, _ = await get_nodes(db, ids=node_ids)

0 commit comments

Comments
 (0)