Skip to content

Commit 88a8832

Browse files
committed
fix(user): revert cross-loop async db changes and stabilize reset/delete paths
1 parent 3398c7e commit 88a8832

File tree

3 files changed

+37
-94
lines changed

3 files changed

+37
-94
lines changed

app/db/crud/user.py

Lines changed: 14 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
from copy import deepcopy
32
from datetime import UTC, datetime, timedelta, timezone
43
from enum import Enum
@@ -9,7 +8,6 @@
98
from sqlalchemy.orm import joinedload, selectinload
109
from sqlalchemy.sql.functions import coalesce
1110

12-
from app.db.base import GetDB, IS_SQLITE
1311
from app.db.compiles_types import DateDiff
1412
from app.db.models import (
1513
Admin,
@@ -698,32 +696,12 @@ async def _delete_user_dependencies(db: AsyncSession, user_ids: list[int]):
698696
if not user_ids:
699697
return
700698

701-
statements = [
702-
delete(NodeUserUsage).where(NodeUserUsage.user_id.in_(user_ids)),
703-
delete(NotificationReminder).where(NotificationReminder.user_id.in_(user_ids)),
704-
delete(UserSubscriptionUpdate).where(UserSubscriptionUpdate.user_id.in_(user_ids)),
705-
delete(UserUsageResetLogs).where(UserUsageResetLogs.user_id.in_(user_ids)),
706-
delete(NextPlan).where(NextPlan.user_id.in_(user_ids)),
707-
users_groups_association.delete().where(users_groups_association.c.user_id.in_(user_ids)),
708-
]
709-
710-
if IS_SQLITE:
711-
for stmt in statements:
712-
await db.execute(stmt)
713-
return
714-
715-
async def execute_in_isolated_session(statement):
716-
async with GetDB() as isolated_db:
717-
await isolated_db.execute(statement)
718-
await isolated_db.commit()
719-
720-
results = await asyncio.gather(
721-
*(execute_in_isolated_session(stmt) for stmt in statements),
722-
return_exceptions=True,
723-
)
724-
for result in results:
725-
if isinstance(result, Exception):
726-
raise result
699+
await db.execute(delete(NodeUserUsage).where(NodeUserUsage.user_id.in_(user_ids)))
700+
await db.execute(delete(NotificationReminder).where(NotificationReminder.user_id.in_(user_ids)))
701+
await db.execute(delete(UserSubscriptionUpdate).where(UserSubscriptionUpdate.user_id.in_(user_ids)))
702+
await db.execute(delete(UserUsageResetLogs).where(UserUsageResetLogs.user_id.in_(user_ids)))
703+
await db.execute(delete(NextPlan).where(NextPlan.user_id.in_(user_ids)))
704+
await db.execute(users_groups_association.delete().where(users_groups_association.c.user_id.in_(user_ids)))
727705

728706

729707
async def remove_user(db: AsyncSession, db_user: User) -> User:
@@ -858,16 +836,12 @@ async def modify_user(db: AsyncSession, db_user: User, modify: UserModify) -> Us
858836
return db_user
859837

860838

861-
async def _reset_user_traffic_and_log(db: AsyncSession, db_user: User, *, reset_at: datetime | None = None) -> datetime:
839+
async def _reset_user_traffic_and_log(db: AsyncSession, db_user: User):
862840
"""Helper to reset user traffic and log the action."""
863-
if reset_at is None:
864-
reset_at = datetime.now(timezone.utc)
865-
866841
await db_user.awaitable_attrs.next_plan
867842
usage_log = UserUsageResetLogs(
868843
user_id=db_user.id,
869844
used_traffic_at_reset=db_user.used_traffic,
870-
reset_at=reset_at,
871845
)
872846
db.add(usage_log)
873847

@@ -885,13 +859,7 @@ async def clear_user_node_usages(db: AsyncSession, user_id: int, *, before: date
885859
await db.execute(stmt)
886860

887861

888-
async def reset_user_data_usage(
889-
db: AsyncSession,
890-
db_user: User,
891-
*,
892-
skip_node_usage_cleanup: bool = False,
893-
node_usage_cleanup_before: datetime | None = None,
894-
) -> User:
862+
async def reset_user_data_usage(db: AsyncSession, db_user: User) -> User:
895863
"""
896864
Resets the data usage of a user and logs the reset.
897865
@@ -902,9 +870,8 @@ async def reset_user_data_usage(
902870
Returns:
903871
User: The updated user object.
904872
"""
905-
reset_at = await _reset_user_traffic_and_log(db, db_user, reset_at=node_usage_cleanup_before)
906-
if not skip_node_usage_cleanup:
907-
await clear_user_node_usages(db, db_user.id, before=reset_at)
873+
await _reset_user_traffic_and_log(db, db_user)
874+
await clear_user_node_usages(db, db_user.id)
908875

909876
if db_user.status not in [UserStatus.expired, UserStatus.disabled]:
910877
db_user.status = UserStatus.active.value
@@ -927,8 +894,8 @@ async def bulk_reset_user_data_usage(db: AsyncSession, users: list[User]) -> lis
927894
list[User]: The updated list of user objects.
928895
"""
929896
for db_user in users:
930-
reset_at = await _reset_user_traffic_and_log(db, db_user)
931-
await clear_user_node_usages(db, db_user.id, before=reset_at)
897+
await _reset_user_traffic_and_log(db, db_user)
898+
await clear_user_node_usages(db, db_user.id)
932899
if db_user.status not in [UserStatus.expired, UserStatus.disabled]:
933900
db_user.status = UserStatus.active.value
934901
await db.commit()
@@ -991,8 +958,8 @@ async def reset_user_by_next(db: AsyncSession, db_user: User) -> User:
991958
db_user.proxy_settings = proxy_settings
992959
db_user.data_limit_reset_strategy = db_user.next_plan.user_template.data_limit_reset_strategy
993960

994-
reset_at = await _reset_user_traffic_and_log(db, db_user)
995-
await clear_user_node_usages(db, db_user.id, before=reset_at)
961+
await _reset_user_traffic_and_log(db, db_user)
962+
await clear_user_node_usages(db, db_user.id)
996963
db_user.status = UserStatus.active
997964

998965
await db.commit()

app/db/models.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
func,
2121
or_,
2222
)
23+
from sqlalchemy.ext.asyncio import async_object_session
2324
from sqlalchemy.ext.hybrid import hybrid_property
2425
from sqlalchemy.orm import Mapped, mapped_column, relationship
2526
from sqlalchemy.sql.expression import select, text
@@ -195,15 +196,28 @@ def last_traffic_reset_time(self):
195196
return self.usage_logs[-1].reset_at if self.usage_logs else self.created_at
196197

197198
async def inbounds(self) -> list[str]:
198-
"""Returns a flat list of all included inbound tags across all proxies"""
199+
"""Returns a flat list of all included inbound tags for enabled groups."""
200+
session = async_object_session(self)
201+
if session is not None:
202+
stmt = (
203+
select(ProxyInbound.tag)
204+
.select_from(users_groups_association)
205+
.join(Group, users_groups_association.c.groups_id == Group.id)
206+
.join(inbounds_groups_association, Group.id == inbounds_groups_association.c.group_id)
207+
.join(ProxyInbound, inbounds_groups_association.c.inbound_id == ProxyInbound.id)
208+
.where(users_groups_association.c.user_id == self.id, Group.is_disabled.is_(False))
209+
.distinct()
210+
)
211+
result = await session.execute(stmt)
212+
return list(result.scalars().all())
213+
214+
# Fallback for detached instances: use already-loaded attrs only.
199215
included_tags = set()
200-
for group in self.groups:
216+
for group in self.__dict__.get("groups") or []:
201217
if group.is_disabled:
202218
continue
203-
204-
await group.awaitable_attrs.inbounds
205-
for inbound in group.inbound_tags:
206-
included_tags.add(inbound)
219+
for inbound in group.__dict__.get("inbounds") or []:
220+
included_tags.add(inbound.tag)
207221
return list(included_tags)
208222

209223
@property

app/operation/user.py

Lines changed: 3 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
from app import notification
1212
from app.db import AsyncSession
13-
from app.db.base import GetDB, IS_SQLITE
1413
from app.db.crud.admin import get_admin
1514
from app.db.crud.bulk import (
1615
reset_all_users_data_usage,
@@ -21,13 +20,11 @@
2120
from app.db.crud.user import (
2221
UsersSortingOptions,
2322
UsersSortingOptionsSimple,
24-
clear_user_node_usages,
2523
create_user,
2624
create_users_bulk,
2725
get_all_users_usages,
2826
get_existing_usernames,
2927
get_expired_users,
30-
get_user_by_id,
3128
get_user_lifetime_used_traffic,
3229
get_user_usages,
3330
get_users,
@@ -85,28 +82,6 @@ class UserOperation(BaseOperation):
8582
def _is_non_blocking_sync_operator(operator_type: OperatorType) -> bool:
8683
return operator_type in (OperatorType.API, OperatorType.WEB)
8784

88-
@staticmethod
89-
async def _sync_user_by_id(user_id: int) -> None:
90-
async with GetDB() as sync_db:
91-
db_user = await get_user_by_id(
92-
sync_db,
93-
user_id,
94-
load_admin=False,
95-
load_next_plan=False,
96-
load_usage_logs=False,
97-
load_groups=True,
98-
)
99-
if not db_user:
100-
return
101-
proto_user = await serialize_user(db_user)
102-
await sync_proto_user(proto_user)
103-
104-
@staticmethod
105-
async def _clear_user_node_usages_by_id(user_id: int, cleanup_before: dt) -> None:
106-
async with GetDB() as cleanup_db:
107-
await clear_user_node_usages(cleanup_db, user_id, before=cleanup_before)
108-
await cleanup_db.commit()
109-
11085
@staticmethod
11186
def _format_validation_errors(error: ValidationError) -> str:
11287
return "; ".join(
@@ -262,11 +237,8 @@ async def update_user(
262237
self, db: AsyncSession, db_user: User, *, include_lifetime_used_traffic: bool = True
263238
) -> UserNotificationResponse:
264239
if self._is_non_blocking_sync_operator(self.operator_type):
265-
if IS_SQLITE:
266-
proto_user = await serialize_user(db_user)
267-
schedule_sync_task(sync_proto_user(proto_user))
268-
else:
269-
schedule_sync_task(self._sync_user_by_id(db_user.id))
240+
proto_user = await serialize_user(db_user)
241+
schedule_sync_task(sync_proto_user(proto_user))
270242
else:
271243
await sync_user(db_user)
272244

@@ -345,17 +317,7 @@ async def remove_user(self, db: AsyncSession, username: str, admin: AdminDetails
345317
async def _reset_user_data_usage(self, db: AsyncSession, db_user: User, admin: AdminDetails):
346318
old_status = db_user.status
347319

348-
skip_node_usage_cleanup = self._is_non_blocking_sync_operator(self.operator_type) and not IS_SQLITE
349-
cleanup_before = dt.now(tz.utc)
350-
db_user = await reset_user_data_usage(
351-
db=db,
352-
db_user=db_user,
353-
skip_node_usage_cleanup=skip_node_usage_cleanup,
354-
node_usage_cleanup_before=cleanup_before,
355-
)
356-
if skip_node_usage_cleanup:
357-
schedule_sync_task(self._clear_user_node_usages_by_id(db_user.id, cleanup_before))
358-
320+
db_user = await reset_user_data_usage(db=db, db_user=db_user)
359321
user = await self.update_user(db, db_user, include_lifetime_used_traffic=False)
360322

361323
if user.status != old_status:

0 commit comments

Comments
 (0)