Skip to content

Commit b30d7e0

Browse files
committed
fix: avoid async DB races in background bulk sync tasks
1 parent 667eb4a commit b30d7e0

File tree

2 files changed

+14
-8
lines changed

2 files changed

+14
-8
lines changed

app/operation/admin.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
from app.db.crud.bulk import activate_all_disabled_users, disable_all_active_users
2121
from app.db.crud.user import get_users, remove_users
2222
from app.models.admin import AdminCreate, AdminDetails, AdminModify, AdminSimple, AdminsResponse, AdminsSimpleResponse
23-
from app.node.sync import schedule_sync_task, sync_remove_users, sync_users, remove_user as sync_remove_user
23+
from app.node.sync import schedule_sync_task, sync_remove_users, sync_users, sync_proto_users, remove_user as sync_remove_user
24+
from app.node.user import serialize_users_for_node
2425
from app.models.stats import Period, UserUsageStatsList
2526
from app.operation import BaseOperation, OperatorType
2627
from app.operation.user import UserOperation
@@ -171,7 +172,8 @@ async def disable_all_active_users(self, db: AsyncSession, username: str, admin:
171172

172173
users = await get_users(db, admin=db_admin)
173174
if self._is_non_blocking_sync_operator(self.operator_type):
174-
schedule_sync_task(sync_users(users))
175+
proto_users = await serialize_users_for_node(users)
176+
schedule_sync_task(sync_proto_users(proto_users))
175177
else:
176178
await sync_users(users)
177179

@@ -188,7 +190,8 @@ async def activate_all_disabled_users(self, db: AsyncSession, username: str, adm
188190

189191
users = await get_users(db, admin=db_admin)
190192
if self._is_non_blocking_sync_operator(self.operator_type):
191-
schedule_sync_task(sync_users(users))
193+
proto_users = await serialize_users_for_node(users)
194+
schedule_sync_task(sync_proto_users(proto_users))
192195
else:
193196
await sync_users(users)
194197

app/operation/user.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@
6363
UserSubscriptionUpdateList,
6464
)
6565
from app.node.sync import remove_user as sync_remove_user
66-
from app.node.sync import schedule_sync_task, sync_proto_user, sync_user, sync_users
67-
from app.node.user import serialize_user
66+
from app.node.sync import schedule_sync_task, sync_proto_user, sync_proto_users, sync_user, sync_users
67+
from app.node.user import serialize_user, serialize_users_for_node
6868
from app.operation import BaseOperation, OperatorType
6969
from app.settings import subscription_settings
7070
from app.utils.jwt import create_subscription_token
@@ -746,7 +746,8 @@ def builder(username: str):
746746
async def bulk_modify_expire(self, db: AsyncSession, bulk_model: BulkUser):
747747
users, users_count = await update_users_expire(db, bulk_model)
748748
if self._is_non_blocking_sync_operator(self.operator_type):
749-
schedule_sync_task(sync_users(users))
749+
proto_users = await serialize_users_for_node(users)
750+
schedule_sync_task(sync_proto_users(proto_users))
750751
else:
751752
await sync_users(users)
752753

@@ -757,7 +758,8 @@ async def bulk_modify_expire(self, db: AsyncSession, bulk_model: BulkUser):
757758
async def bulk_modify_datalimit(self, db: AsyncSession, bulk_model: BulkUser):
758759
users, users_count = await update_users_datalimit(db, bulk_model)
759760
if self._is_non_blocking_sync_operator(self.operator_type):
760-
schedule_sync_task(sync_users(users))
761+
proto_users = await serialize_users_for_node(users)
762+
schedule_sync_task(sync_proto_users(proto_users))
761763
else:
762764
await sync_users(users)
763765

@@ -768,7 +770,8 @@ async def bulk_modify_datalimit(self, db: AsyncSession, bulk_model: BulkUser):
768770
async def bulk_modify_proxy_settings(self, db: AsyncSession, bulk_model: BulkUsersProxy):
769771
users, users_count = await update_users_proxy_settings(db, bulk_model)
770772
if self._is_non_blocking_sync_operator(self.operator_type):
771-
schedule_sync_task(sync_users(users))
773+
proto_users = await serialize_users_for_node(users)
774+
schedule_sync_task(sync_proto_users(proto_users))
772775
else:
773776
await sync_users(users)
774777

0 commit comments

Comments
 (0)